In [1]:
import pyspark
from functools import reduce
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import functions as sf

In [2]:
spark = SparkSession \
        .builder \
        .master("spark://spark-master:7077") \
        .config(
        "spark.jars",
        "/opt/bitnami/spark/jars/gcs-connector-hadoop3-latest.jar,"
        "/opt/bitnami/spark/jars/spark-bigquery-with-dependencies_2.12-0.42.4.jar"
        ) \
        .appName('gcs-bq-pyspark') \
        .config("spark.executor.instances", 1) \
        .config("spark.executor.cores", 4) \
        .config("spark.executor.memory", "8g") \
        .getOrCreate()

25/08/14 02:12:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark.sparkContext.setLogLevel("WARN")

spark._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
spark._jsc.hadoopConfiguration().set('fs.gs.auth.service.account.enable', 'true')
spark._jsc.hadoopConfiguration().set('google.cloud.auth.service.account.json.keyfile', "/opt/keys/credentials.json")

In [4]:
# Read from CSV files
bucket_name = "data_expo_bucket"
files = [f"gs://{bucket_name}/{year}_data.csv" for year in range(2000, 2009)]
files

['gs://data_expo_bucket/2000_data.csv',
 'gs://data_expo_bucket/2001_data.csv',
 'gs://data_expo_bucket/2002_data.csv',
 'gs://data_expo_bucket/2003_data.csv',
 'gs://data_expo_bucket/2004_data.csv',
 'gs://data_expo_bucket/2005_data.csv',
 'gs://data_expo_bucket/2006_data.csv',
 'gs://data_expo_bucket/2007_data.csv',
 'gs://data_expo_bucket/2008_data.csv']

In [5]:
df = spark.read.option("header", "true").csv(files)

                                                                                

In [6]:
# Check DataFrame's schema
df.schema

StructType([StructField('Year', StringType(), True), StructField('Month', StringType(), True), StructField('DayofMonth', StringType(), True), StructField('DayOfWeek', StringType(), True), StructField('DepTime', StringType(), True), StructField('CRSDepTime', StringType(), True), StructField('ArrTime', StringType(), True), StructField('CRSArrTime', StringType(), True), StructField('UniqueCarrier', StringType(), True), StructField('FlightNum', StringType(), True), StructField('TailNum', StringType(), True), StructField('ActualElapsedTime', StringType(), True), StructField('CRSElapsedTime', StringType(), True), StructField('AirTime', StringType(), True), StructField('ArrDelay', StringType(), True), StructField('DepDelay', StringType(), True), StructField('Origin', StringType(), True), StructField('Dest', StringType(), True), StructField('Distance', StringType(), True), StructField('TaxiIn', StringType(), True), StructField('TaxiOut', StringType(), True), StructField('Cancelled', StringType

In [7]:
# Take a look at some records
df.take(5)

25/08/14 02:12:26 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

[Row(Year='2000', Month='1', DayofMonth='28', DayOfWeek='5', DepTime='1647', CRSDepTime='1647', ArrTime='1906', CRSArrTime='1859', UniqueCarrier='HP', FlightNum='154', TailNum='N808AW', ActualElapsedTime='259', CRSElapsedTime='252', AirTime='233', ArrDelay='7', DepDelay='0', Origin='ATL', Dest='PHX', Distance='1587', TaxiIn='15', TaxiOut='11', Cancelled='0', CancellationCode='NA', Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Year='2000', Month='1', DayofMonth='29', DayOfWeek='6', DepTime='1648', CRSDepTime='1647', ArrTime='1939', CRSArrTime='1859', UniqueCarrier='HP', FlightNum='154', TailNum='N653AW', ActualElapsedTime='291', CRSElapsedTime='252', AirTime='239', ArrDelay='40', DepDelay='1', Origin='ATL', Dest='PHX', Distance='1587', TaxiIn='5', TaxiOut='47', Cancelled='0', CancellationCode='NA', Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Yea

In [8]:
# Drop NULL columns
cols_to_drop = ['CancellationCode', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']
df = df.drop(*cols_to_drop)

In [9]:
# Check DataFrame's schema after dropping columns
df.schema

StructType([StructField('Year', StringType(), True), StructField('Month', StringType(), True), StructField('DayofMonth', StringType(), True), StructField('DayOfWeek', StringType(), True), StructField('DepTime', StringType(), True), StructField('CRSDepTime', StringType(), True), StructField('ArrTime', StringType(), True), StructField('CRSArrTime', StringType(), True), StructField('UniqueCarrier', StringType(), True), StructField('FlightNum', StringType(), True), StructField('TailNum', StringType(), True), StructField('ActualElapsedTime', StringType(), True), StructField('CRSElapsedTime', StringType(), True), StructField('AirTime', StringType(), True), StructField('ArrDelay', StringType(), True), StructField('DepDelay', StringType(), True), StructField('Origin', StringType(), True), StructField('Dest', StringType(), True), StructField('Distance', StringType(), True), StructField('TaxiIn', StringType(), True), StructField('TaxiOut', StringType(), True), StructField('Cancelled', StringType

In [10]:
# Repartition for safe parallelism
df = df.repartition(40)

In [11]:
# Write to Parquet in GCS
df.write.mode("overwrite").parquet(f"gs://{bucket_name}/all_years_parquet")

                                                                                

In [12]:
df_parquet = spark.read.parquet(f"gs://{bucket_name}/all_years_parquet")

                                                                                

In [13]:
df_parquet.take(5)

                                                                                

[Row(Year='2000', Month='3', DayofMonth='9', DayOfWeek='4', DepTime='902', CRSDepTime='900', ArrTime='1450', CRSArrTime='1433', UniqueCarrier='DL', FlightNum='308', TailNum='N668DN', ActualElapsedTime='228', CRSElapsedTime='213', AirTime='197', ArrDelay='17', DepDelay='2', Origin='PHX', Dest='ATL', Distance='1587', TaxiIn='7', TaxiOut='24', Cancelled='0', Diverted='0'),
 Row(Year='2000', Month='1', DayofMonth='3', DayOfWeek='1', DepTime='1148', CRSDepTime='1150', ArrTime='1418', CRSArrTime='1424', UniqueCarrier='US', FlightNum='1640', TailNum='N427US', ActualElapsedTime='150', CRSElapsedTime='154', AirTime='120', ArrDelay='-6', DepDelay='-2', Origin='PBI', Dest='PIT', Distance='952', TaxiIn='21', TaxiOut='9', Cancelled='0', Diverted='0'),
 Row(Year='2000', Month='1', DayofMonth='10', DayOfWeek='1', DepTime='1406', CRSDepTime='1340', ArrTime='1455', CRSArrTime='1430', UniqueCarrier='US', FlightNum='2122', TailNum='N850US', ActualElapsedTime='49', CRSElapsedTime='50', AirTime='33', ArrDe

In [14]:
df_parquet.count()

                                                                                

54664946

In [15]:
df_parquet.select([sf.count(sf.when(sf.col(c).isin(['null', 'NULL', 'NA', 'NaN']) | sf.col(c).isNull(), c)).alias(c) for c in df_parquet.columns]).show()



+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|Diverted|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+
|   0|    0|         0|        0|1193911|         0|1307642|         0|            0|        0|  98861|          1307643|          1675|1307763| 1307643| 1193911|     0|   0|       0| 70096|  64442|        0|       0|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+-------

                                                                                

In [16]:
# Filter missing TaxiIn and TaxiOut
df_parquet = df_parquet.filter(~(sf.col("TaxiIn").isNull() | sf.col("TaxiOut").isNull() | sf.col("TaxiIn").isin(['null', 'NULL', 'NA', 'NaN']) | sf.col("TaxiOut").isin(['null', 'NULL', 'NA', 'NaN'])))

In [17]:
# Check NULL values after removing records
df_parquet.select([sf.count(sf.when(sf.col(c).isin(['null', 'NULL', 'NA', 'NaN']) | sf.col(c).isNull(), c)).alias(c) for c in df_parquet.columns]).show()



+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|Diverted|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+
|   0|    0|         0|        0|1129469|         0|1237546|         0|            0|        0|  56411|          1237547|          1268|1237667| 1237547| 1129469|     0|   0|       0|     0|      0|        0|       0|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+-------

                                                                                

In [18]:
# Filter missing CRS Elapsed Time
df_parquet = df_parquet.filter(~(sf.col("CRSElapsedTime").isNull() | sf.col("CRSElapsedTime").isin(['null', 'NULL', 'NA', 'NaN'])))

In [19]:
# Modify Tail Num information for unknown tail number
df_parquet = df_parquet.withColumn("TailNum", sf.when((sf.col("TailNum") == "UNKNOW") | sf.col("TailNum").isin(['null', 'NULL', 'NA', 'NaN']) | sf.col("TailNum").isNull(), "Unknown Tail Number") \
    .otherwise(sf.col("TailNum")))

In [20]:
# Remove non-ASCII characters
df_parquet = df_parquet.withColumn(
    "TailNum",
    sf.regexp_replace("TailNum", "[^\x20-\x7E]", "")
)

In [21]:
# Define condition to create a new column
missing_values = ['null', 'NULL', 'NA', 'NaN']

cols_to_check = [
    "DepTime", "ArrTime", "ActualElapsedTime", 
    "AirTime", "ArrDelay", "DepDelay"
]

conditions = [
    (sf.col(c).isNull()) | (sf.col(c).isin(missing_values))
    for c in cols_to_check
]

In [22]:
# (((cond1 | cond2) | cond3) | cond4) | cond5) | cond6
combined_condition = reduce(lambda a, b: a | b, conditions)

In [23]:
# Create a new column based on our condition
df_parquet = df_parquet.withColumn(
    "MissingData",
    sf.when(combined_condition, 1).otherwise(0)
)

In [24]:
# Show number of records that need modification
df_parquet.groupBy("MissingData").count().show()



+-----------+--------+
|MissingData|   count|
+-----------+--------+
|          1| 1236400|
|          0|53357182|
+-----------+--------+



                                                                                

In [25]:
# Fill in NULL values from CRS columns data
df_parquet = df_parquet.withColumns({
    "DepTime":
    sf.when(sf.col("DepTime").isNull() | sf.col("DepTime").isin(missing_values), sf.col("CRSDepTime")).otherwise(sf.col("DepTime")),
    "ArrTime":
    sf.when(sf.col("ArrTime").isNull() | sf.col("ArrTime").isin(missing_values), sf.col("CRSArrTime")).otherwise(sf.col("ArrTime")),
    "ActualElapsedTime":
    sf.when(sf.col("ActualElapsedTime").isNull() | sf.col("ActualElapsedTime").isin(missing_values), sf.col("CRSElapsedTime")).otherwise(sf.col("ActualElapsedTime")),
    "DepDelay":
    sf.when(sf.col("DepDelay").isNull() | sf.col("DepDelay").isin(missing_values), "0").otherwise(sf.col("DepDelay")),
    "ArrDelay":
    sf.when(sf.col("ArrDelay").isNull() | sf.col("ArrDelay").isin(missing_values), "0").otherwise(sf.col("ArrDelay"))
})

In [26]:
# Check NULL values once again
df_parquet.select([sf.count(sf.when(sf.col(c).isin(['null', 'NULL', 'NA', 'NaN']) | sf.col(c).isNull(), c)).alias(c) for c in df_parquet.columns]).show()



+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+-----------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|Diverted|MissingData|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+-----------+
|   0|    0|         0|        0|      0|         0|      0|         0|            0|        0|      0|                0|             0|1236399|       0|       0|     0|   0|       0|     0|      0|        0|       0|          0|
+----+-----+----------+---------+-------+----------+-------+----------+---------

                                                                                

In [27]:
# Filling Air Time
df_parquet = df_parquet.withColumn(
    "AirTime",
    sf.when(sf.col("AirTime").isNull() | sf.col("AirTime").isin(missing_values), (sf.col("ActualElapsedTime").cast('int') - sf.col("TaxiIn").cast('int') - sf.col("TaxiOut").cast('int')).cast('string')).otherwise(sf.col("AirTime"))
)

In [28]:
# Check NULL values once again
df_parquet.select([sf.count(sf.when(sf.col(c).isin(['null', 'NULL', 'NA', 'NaN']) | sf.col(c).isNull(), c)).alias(c) for c in df_parquet.columns]).show()



+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+-----------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|Diverted|MissingData|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+-----------+
|   0|    0|         0|        0|      0|         0|      0|         0|            0|        0|      0|                0|             0|      0|       0|       0|     0|   0|       0|     0|      0|        0|       0|          0|
+----+-----+----------+---------+-------+----------+-------+----------+---------

                                                                                

In [29]:
# Columns to convert to integer
int_cols = [
    "Year", "Month", "DayofMonth", "DayOfWeek",
    "DepTime", "CRSDepTime", "ArrTime", "CRSArrTime",
    "FlightNum", "ActualElapsedTime", "CRSElapsedTime",
    "AirTime", "ArrDelay", "DepDelay", "Distance",
    "TaxiIn", "TaxiOut", "Cancelled", "Diverted"
]

In [30]:
# Convert type
for c in int_cols:
    df_parquet = df_parquet.withColumn(c, sf.col(c).cast("int"))

In [31]:
# Check DataFrame schema after converting
df_parquet.schema

StructType([StructField('Year', IntegerType(), True), StructField('Month', IntegerType(), True), StructField('DayofMonth', IntegerType(), True), StructField('DayOfWeek', IntegerType(), True), StructField('DepTime', IntegerType(), True), StructField('CRSDepTime', IntegerType(), True), StructField('ArrTime', IntegerType(), True), StructField('CRSArrTime', IntegerType(), True), StructField('UniqueCarrier', StringType(), True), StructField('FlightNum', IntegerType(), True), StructField('TailNum', StringType(), True), StructField('ActualElapsedTime', IntegerType(), True), StructField('CRSElapsedTime', IntegerType(), True), StructField('AirTime', IntegerType(), True), StructField('ArrDelay', IntegerType(), True), StructField('DepDelay', IntegerType(), True), StructField('Origin', StringType(), True), StructField('Dest', StringType(), True), StructField('Distance', IntegerType(), True), StructField('TaxiIn', IntegerType(), True), StructField('TaxiOut', IntegerType(), True), StructField('Cance

In [32]:
# Create Date column
df_parquet = df_parquet.withColumn(
    "FlightDate",
    sf.to_date(
        sf.concat_ws("-", 
                    sf.col("Year"), 
                    sf.lpad(sf.col("Month").cast("string"), 2, "0"), 
                    sf.lpad(sf.col("DayOfMonth").cast("string"), 2, "0"))
    )
)

In [33]:
df_parquet.select(df_parquet.FlightDate).show()

[Stage 25:>                                                         (0 + 1) / 1]

+----------+
|FlightDate|
+----------+
|2000-03-09|
|2000-01-03|
|2000-01-10|
|2000-03-12|
|2000-02-16|
|2000-01-17|
|2000-03-29|
|2000-03-13|
|2000-01-03|
|2000-03-26|
|2000-03-30|
|2000-01-18|
|2000-02-12|
|2000-02-09|
|2000-01-22|
|2000-02-06|
|2000-01-27|
|2000-01-15|
|2000-03-31|
|2000-01-27|
+----------+
only showing top 20 rows



                                                                                

In [34]:
# Write partitioned, clustered data to BigQuery
spark.conf.set('temporaryGcsBucket', 'data_expo_temp_bucket')
output_dataset = "data-expo-pipeline.data_expo_dataset.final_data_table"

In [35]:
df_parquet.write \
    .format("bigquery") \
    .option("table", output_dataset) \
    .option("partitionField", "FlightDate") \
    .option("clusteredFields", "Origin,Dest,UniqueCarrier") \
    .mode("overwrite") \
    .save()

                                                                                

In [None]:
spark.stop()