Our link is : "https://openflights.org/data.html"

In [2]:
%%writefile spark_analysis.py
import matplotlib 
matplotlib.use('agg') 
import argparse 
parser = argparse.ArgumentParser() 
parser.add_argument("--bucket", help="bucket for input and output") 
args = parser.parse_args() 
BUCKET = args.bucket

Writing spark_analysis.py


#### Reading in data

In [3]:
%%writefile -a spark_analysis.py
from pyspark.sql import SparkSession, SQLContext, Row


gcs_bucket = 'dataproc-staging-us-central1-136658108854-k7f5ytpn'
spark = SparkSession.builder.appName("airflight").getOrCreate()

sc = spark.sparkContext
data_file = "gs://" + gcs_bucket + "//routes.dat"
raw_rdd = sc.textFile(data_file).cache()
raw_rdd.take(5)

Appending to spark_analysis.py


In [4]:
%%writefile -a spark_analysis.py
csv_rdd = raw_rdd.map(lambda row: row.split(","))
parsed_rdd = csv_rdd.map(lambda r: Row(
    airline = str(r[0]),
    airline_id = r[1],
    source_airport = str(r[2]),
    source_airport_id = r[3],
    dest_airport = str(r[4]),
    dest_airport_id = 0 if ("\\N" in r[5]) else int(r[5])   
    )             
)
parsed_rdd.take(35)

Appending to spark_analysis.py


#### Spark Analysis

In [5]:
%%writefile -a spark_analysis.py
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(parsed_rdd)

Appending to spark_analysis.py


In [6]:
%%writefile -a spark_analysis.py
df.registerTempTable("routes")
best20airlines = sqlContext.sql("""SELECT airline,COUNT(dest_airport_id) AS countdestination FROM routes GROUP BY airline ORDER BY countdestination DESC LIMIT 20""")
best20airlines.show()

Appending to spark_analysis.py


In [7]:
%%writefile -a spark_analysis.py
ax = best20airlines.toPandas().plot.bar(x='airline', subplots=True, figsize=(10,25))

Appending to spark_analysis.py


In [8]:
%%writefile -a spark_analysis.py
ax[0].get_figure().savefig('report.png');

Appending to spark_analysis.py


In [9]:
%%writefile -a spark_analysis.py
import google.cloud.storage as gcs 
bucket = gcs.Client().get_bucket(BUCKET) 
for blob in bucket.list_blobs(prefix='sparktodp/'): 
    blob.delete() 
bucket.blob('sparktodp/report.png').upload_from_filename('report.png')

Appending to spark_analysis.py


In [10]:
%%writefile -a spark_analysis.py
connections_by_protocol.write.format("csv").mode("overwrite").save(
    "gs://{}/sparktodp/connections_by_protocol".format(BUCKET))

Appending to spark_analysis.py


In [11]:
BUCKET_list = !gcloud info --format='value(config.project)' 
BUCKET=BUCKET_list[0] 
print('Writing to {}'.format(BUCKET)) 
!/opt/conda/miniconda3/bin/python spark_analysis.py --bucket=$BUCKET

Writing to deproject2
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+-------+----------------+                                                      
|airline|countdestination|
+-------+----------------+
|     FR|            2484|
|     AA|            2354|
|     UA|            2180|
|     DL|            1981|
|     US|            1960|
|     CZ|            1454|
|     MU|            1263|
|     CA|            1260|
|     WN|            1146|
|     U2|            1130|
|     AF|            1071|
|     LH|             923|
|     AZ|             877|
|     IB|             831|
|     KL|             830|
|     ZH|             815|
|     AB|             798|
|     FL|             726|
|     AC|             705|
|     TK|             658|
+-------+----------------+

Traceback (most recent call last):
  File "spark_analysis.py", line 41, in <module>
    connections_by_protocol.write.format("csv").mode("overwri

In [12]:
!gsutil ls gs://$BUCKET/sparktodp/**

gs://deproject2/sparktodp/report.png


In [13]:
!gsutil cp spark_analysis.py gs://$BUCKET/sparktodp/spark_analysis.py

Copying file://spark_analysis.py [Content-Type=text/x-python]...
/ [1 files][  1.6 KiB/  1.6 KiB]                                                
Operation completed over 1 objects/1.6 KiB.                                      
