# First-Order Edges with PySpark
This notebook exports data for a first-order network. **Don't forget to kill the session at the end with `spark.stop()`!**

In [1]:
import findspark
findspark.init('/usr/hdp/current/spark2-client')

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = (SparkConf().setMaster("yarn-client").setAppName("1stOrderAirlineDataAnalysis")
        .set("spark.yarn.queue", "eecs598w19")
        .set("spark.executor.memory", "4g")
        .set("spark.executor.instances", "10")
        .set("spark.driver.memory", "4g")
        .set("spark.shuffle.service.enabled", "true")
        .set("spark.dynamicAllocation.enabled", "true")
        .set("spark.dynamicAllocation.minExecutors", "4")
        )

spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")  # Hides irrelevant warnings caused by workers running default of Python2

In [2]:
import signac
from pyspark.sql.functions import count
from util import hdfs_fn

project = signac.get_project()
job = project.find_jobs({"year": 2011, "quarter": 1}).next()

In [3]:
df = spark.read.csv(hdfs_fn(job, 'Coupon.csv'), header=True, inferSchema=True)

In [4]:
col_names = ['ItinID', 'OriginAirportID', 'DestAirportID']
df_network = df[col_names].repartition('OriginAirportID')
edges = df_network.groupby(['OriginAirportID', 'DestAirportID']).agg(count('ItinID').alias('weight'))

In [5]:
def make_line(row):
    return '{}\t{}\t{}'.format(row.OriginAirportID, row.DestAirportID, row.weight)

edges.rdd.map(make_line).saveAsTextFile(hdfs_fn(job, 'edges.tsv'))

This leaves the data in a folder called `edges.tsv` with files called part-00000, part-00001, ..., which can be combined with cat.

The files are tab-separated, with columns `Origin Airport ID`, `Destination Airport ID`, `Number of Connections (weight)`

In [6]:
spark.stop()