In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
!unzip gdrive/My\ Drive/Data/Data.zip > /dev/null

In [3]:
!pip install pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 42.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=7ac21a5de0b54572c07d352504432123ce744babf54fb2e8e6727e9f0801892b
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [4]:
from pyspark.sql import *
from pyspark import SparkContext, SparkConf

In [5]:
conf = SparkConf().setAppName("My App").setMaster("local")

sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [6]:
Bombing_Operations = spark.read.json(".//Bombing_Operations.json")
Aircraft_Glossary = spark.read.json(".//Aircraft_Glossary.json")

# **1. Những quốc gia nào tham gia và thực hiện bao nhiêu phi vụ trong cuộc chiến?**

In [8]:
Bombing_Operations.registerTempTable("Bombing_Operations")

query = """
SELECT ContryFlyingMission, count(*) as MissionsCount
FROM Bombing_Operations
GROUP BY ContryFlyingMission
ORDER BY MissionsCount DESC
"""

missions_counts = spark.sql(query).toPandas()
missions_counts.head()



Unnamed: 0,ContryFlyingMission,MissionsCount
0,UNITED STATES OF AMERICA,3708997
1,VIETNAM (SOUTH),622013
2,LAOS,32777
3,KOREA (SOUTH),24469
4,AUSTRALIA,12519


# **2. Hiển thị số lượng phi vụ theo thời gian cho mỗi quốc gia liên quan?**

In [9]:
from pyspark.sql.functions import *

missions_by_date = Bombing_Operations.selectExpr(["to_date(MissionDate) as MissionDate", "ContryFlyingMission"])\
                    .groupBy(["MissionDate", "ContryFlyingMission"])\
                    .agg(count("*").alias("MissionsCount"))\
                    .sort(asc("MissionDate")).toPandas()
                    
missions_by_date.head()

Unnamed: 0,MissionDate,ContryFlyingMission,MissionsCount
0,1965-10-01,UNITED STATES OF AMERICA,447
1,1965-10-02,UNITED STATES OF AMERICA,652
2,1965-10-03,UNITED STATES OF AMERICA,608
3,1965-10-04,UNITED STATES OF AMERICA,532
4,1965-10-05,UNITED STATES OF AMERICA,697


# **3. Ai đã đánh bom địa điểm này?**

In [10]:
jun_29_operations = Bombing_Operations.where("MissionDate = '1966-06-29' AND TargetCountry='NORTH VIETNAM'")

jun_29_operations.write.mode('overwrite').json("jun_29_operations.json")

TakeoffLocationCounts = spark.read.json("jun_29_operations.json")\
                            .groupBy("TakeoffLocation").agg(count("*").alias("MissionsCount"))\
                            .sort(desc("MissionsCount")).toPandas()
TakeoffLocationCounts.head()

Unnamed: 0,TakeoffLocation,MissionsCount
0,CONSTELLATION,87
1,TAKHLI,56
2,KORAT,55
3,UDORN AB,44
4,UBON AB,44


# **4. Hãy cho biết loại máy bay được sử dụng nhiều nhất trong chiến tranh Việt Nam(loại máy bay thực hiện số lượng phi vụ nhiều nhất) là gì?**

In [11]:
Bombing_Operations.registerTempTable("Bombing_Operations")
Aircraft_Glossary.registerTempTable("Aircraft_Glossary")

query = """
SELECT AirCraftType, count(*) MissionsCount
FROM Bombing_Operations 
JOIN Aircraft_Glossary
ON Bombing_Operations.AirCraft = Aircraft_Glossary.AirCraft
GROUP BY AirCraftType
ORDER BY MissionsCount DESC
"""

missions_aircrafts = spark.sql(query).toPandas()
missions_aircrafts.head()



Unnamed: 0,AirCraftType,MissionsCount
0,Fighter Jet Bomber,1073126
1,Fighter Jet,882594
2,Jet Fighter Bomber,451385
3,Attack Aircraft,315246
4,Light ground-attack aircraft,267457
