In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext

# Initialize

In [None]:
# create the session
spark = SparkSession.builder.getOrCreate()

# create the context
sc = spark.sparkContext

spark

### Read files 

In [None]:
Bombing_Operations = spark.read.json("Bombing_Operations.json.gz")
Aircraft_Glossary = spark.read.json("Aircraft_Glossary.json.gz")

Bombing_Operations.printSchema()
Aircraft_Glossary.printSchema()
posts_merged.show()  

In [None]:
# Sample
Bombing_Operations.take(3)

# Get a formatted sample with `show()`:
Aircraft_Glossary.show()

print("In total there are {0} operations".format(Bombing_Operations.count()))

### Move to pandas if possible

In [None]:
missions_count_pd = missions_counts.toPandas()
missions_count_pd.head()

### Save

In [None]:
jun_29_operations.write.mode('overwrite').json("jun_29_operations.json")
jun_29_operations = spark.read.json("jun_29_operations.json")

# Operations

### Merge

In [None]:
merged_df = rd_messages.join(rd_score, on="id", how="inner") 
merged_df.show()  

### Group by

In [None]:
missions_counts = Bombing_Operations.groupBy("ContryFlyingMission")\
                                    .agg(count("*").alias("MissionsCount"))\
                                    .sort(desc("MissionsCount"))
missions_counts.show()

With SQL

In [None]:
Bombing_Operations.registerTempTable("Bombing_Operations")

query = """
SELECT ContryFlyingMission, count(*) as MissionsCount
FROM Bombing_Operations
GROUP BY ContryFlyingMission
ORDER BY MissionsCount DESC
"""

missions_counts = spark.sql(query)
missions_counts.show()

### selectExpr

Keywords: `group by`, `parse date`, `plot`

select the relevant columns:

In [None]:
missions_countries = Bombing_Operations.selectExpr(["to_date(MissionDate) as MissionDate", "ContryFlyingMission"])
missions_countries

Keywords: `RDD map reduce` `cache` `save results`

<img style="float: right;" src="img/Hanoi_POL1966.jpg">

This picture is the Hanoi POL facility (North Vietnam) burning after it was attacked by the U.S. Air Force on 29 June 1966 in the context of the Rolling Thunder operation. 

We are interested in discovering what was the most common take-off location during that day.

In [None]:
jun_29_operations = Bombing_Operations.where("MissionDate = '1966-06-29' AND TargetCountry='NORTH VIETNAM'")

jun_29_operations.groupBy("ContryFlyingMission").agg(count("*").alias("MissionsCount")).toPandas()

Specify to cache the content in memory:

In [None]:
jun_29_operations.cache()
%time jun_29_operations.count()

# RDDs

In [None]:
all_locations = jun_29_operations.rdd.map(lambda row: (row.TakeoffLocation, 1))

locations_counts_rdd = all_locations.reduceByKey(lambda a, b: a+b).sortBy(lambda r: -r[1])
