## 1. Upload raw data files into S3

In [None]:
!pip install pyspark

In [None]:
!pip install boto3

1.1 Imports and Configs 

In [1]:
import pandas as pd

import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import udf, col, concat_ws
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, to_timestamp, to_date

In [2]:
config = configparser.ConfigParser()
config.read('configs/global.cfg')

KEY = config.get('AWS', 'AWS_ACCESS_KEY_ID')
SECRET = config.get('AWS','AWS_SECRET_ACCESS_KEY')

input_path = config.get('PATH', 'INPUT_DATA_FOLDER')
output_path = config.get('PATH', 'OUTPUT_DATA_FOLDER')

raw_flight_data_path = input_path + config.get('PATH', 'FLIGHTS_RAW_FOLDER')
raw_tweets_data_path = input_path + config.get('PATH', 'TWEETS_RAW_FOLDER')

1.2 Create Spark Session

In [3]:
def create_spark_session():
    """
    - Create or retrieve existing spark session
    
    Returns: 
        spark -- SparkSession object 
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config("dfs.client.read.shortcircuit.skip.checksum", "true")\
        .getOrCreate()
    return spark

In [4]:
spark = create_spark_session()

sc = spark.sparkContext


1.3 Load flights df

In [5]:
flights_df = spark.read.options( 
            recursiveFileLookup=True , 
            inferSchema=True, 
            header=True)\
        .csv( raw_flight_data_path )

In [None]:
from pyspark.sql.functions import countDistinct, desc

flights_df.select("callsign").groupBy("callsign")\
    .agg( countDistinct("callsign").alias("count") )\
    .sort( desc("count") )\
    .limit(10).toPandas()

In [6]:
flights_df.limit(20).toPandas()

Unnamed: 0,callsign,number,icao24,registration,typecode,origin,destination,firstseen,lastseen,day,latitude_1,longitude_1,altitude_1,latitude_2,longitude_2,altitude_2
0,ETH728,,040188,,,KEWR,EBBR,2020-11-30 02:23:10+00:00,2020-12-01 05:42:23+00:00,2020-12-01 00:00:00+00:00,40.670083,-74.182809,0.0,50.894989,4.506102,30.48
1,TAM9560,,e48df6,PT-MUI,B77W,SBGR,OTHH,2020-11-30 02:30:53+00:00,2020-12-01 04:58:15+00:00,2020-12-01 00:00:00+00:00,-23.417545,-46.42629,1219.2,25.213297,51.645329,236.22
2,CKS416,,a96b14,N706CK,B744,KSUU,RKSG,2020-11-30 04:16:19+00:00,2020-12-01 04:57:26+00:00,2020-12-01 00:00:00+00:00,38.292023,-121.88811,304.8,36.922302,126.834352,1661.16
3,JST8992,,7c6b0b,VH-VFH,A320,WSSL,YMML,2020-11-30 05:51:16+00:00,2020-12-01 05:54:57+00:00,2020-12-01 00:00:00+00:00,1.428223,103.875034,304.8,-37.663786,144.874201,281.94
4,JST8993,,7c6b1c,VH-VFY,A320,WSSS,YMML,2020-11-30 06:11:13+00:00,2020-12-01 07:47:17+00:00,2020-12-01 00:00:00+00:00,1.316849,103.979823,304.8,-37.662643,144.854795,198.12
5,N208VA,,a1b197,N208VA,,NTAA,SBCB,2020-11-30 06:55:20+00:00,2020-12-01 02:11:16+00:00,2020-12-01 00:00:00+00:00,-17.541046,-149.589073,304.8,-22.927862,-42.103831,137.16
6,ABW193,,4243ff,VQ-BFU,B748,RKSI,EDDP,2020-11-30 06:55:39+00:00,2020-12-01 01:14:40+00:00,2020-12-01 00:00:00+00:00,37.494507,126.42482,0.0,51.413455,12.243729,121.92
7,CXA805,MF805,7812be,,,WMKK,CYVR,2020-11-30 07:11:24+00:00,2020-12-01 01:17:55+00:00,2020-12-01 00:00:00+00:00,2.740402,101.699927,0.0,49.192886,-123.092792,198.12
8,CCA595,,781161,B-1431,B789,ZGSZ,LIMC,2020-11-30 07:42:15+00:00,2020-12-01 04:05:14+00:00,2020-12-01 00:00:00+00:00,22.679214,113.786899,304.8,45.626953,8.723881,167.64
9,CLX7151,,4d010b,LX-VCV,B744,FAOR,EHAM,2020-11-30 07:49:54+00:00,2020-12-01 06:28:59+00:00,2020-12-01 00:00:00+00:00,-26.0936,28.270991,2133.6,52.300136,4.77819,-15.24


In [None]:
flights_staging = flights_df.selectExpr( "callsign", "icao24 as trasponder_id", 
                      "registration as aircraft_id", "typecode as aircraft_type",
                     "origin as depart_airport_id", "destination as arrival_airport_id",
                        "firstseen as depart_at", "lastseen as arrival_at")\
    .filter("arrival_airport_id is not null")

In [None]:
flights_staging.limit(10).head()

Cardinality of sample test 2021-03-12: 6.109.738

1.3.1 Enrich airport info

In [None]:
from pyspark import SparkFiles
spark.sparkContext.addFile("https://ourairports.com/data/airports.csv")

airports_df = spark.read.csv("file://" +SparkFiles.get("airports.csv"), header=True, inferSchema= True)


In [None]:
airports_staging = airports_df.selectExpr("id", "ident as code", "type", "name", "iso_country", "municipality")

Cardinality full dataset test 2021-03-10: 63.078 rows

In [None]:
airports_staging.limit(10).toPandas()

1.4 Load Tweets df

In [5]:
tweets_df = spark.read.options( 
            recursiveFileLookup=True , 
            inferSchema=True, 
            header=True)\
        .json( raw_tweets_data_path )

In [26]:
tweets_staging = tweets_df.select(['date', 'keywords', 'location.country', 'tweet_id'])\
    .withColumn("keywords", concat_ws(",", col("keywords")))\
                .filter( col("location").isNotNull() )

In [29]:
tweets_staging.dtypes

[('date', 'string'),
 ('keywords', 'string'),
 ('country', 'string'),
 ('tweet_id', 'bigint')]

Cardinality of sample test 2021-03-12 : 2.302.853 rows

2 Load to output (S3)

In [30]:
tweets_staging.write.parquet(output_path + "/tweets.parquet", mode="overwrite")

In [None]:
flights_staging.write.parquet(output_path + "/flights.parquet", mode="overwrite")

In [None]:
airports_staging.write.parquet(output_path + "/airports.parquet", mode="overwrite")

In [None]:
spark.read.parquet( output_path + "flights.parquet" ).count()

2.1 Remove .crc extension files

In [31]:
def remove_crc_files( parquet_directory ):
    directory = os.listdir(parquet_directory)

    for item in directory:
        if item.endswith(".crc"):
            os.remove(os.path.join(parquet_directory, item))

remove_crc_files( output_path + "/flights.parquet" )    
remove_crc_files( output_path + "/tweets.parquet" )    

3 Stop spark

In [None]:
spark.stop()