
We can directly save a DF as a table.


Approaches:
1. read data into df > apply transformations > create view > create table > insert into table from view
2. read data into df > apply transformations > write table directly from df

In [0]:
flight_schema_ddl = """FL_DATE STRING, OP_CARRIER STRING, OP_CARRIER_FL_NUM INT, ORIGIN STRING, ORIGIN_CITY_NAME STRING, DEST STRING, DEST_CITY_NAME STRING, CRS_DEP_TIME INT, DEP_TIME INT, WHEELS_ON INT, TAXI_IN INT, CRS_ARR_TIME INT, ARR_TIME INT, CANCELLED STRING, DISTANCE INT"""


flight_time_raw_df = spark.read\
                    .format("json")\
                      .schema(flight_schema_ddl)\
                          .option("mode", "FAILFAST")\
                              .option("dateFormat", "M/d/y")\
                                .load("/FileStore/tables/flight_time.json")

flight_time_raw_df.show(10)

+--------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
| FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|ORIGIN_CITY_NAME|DEST|DEST_CITY_NAME|CRS_DEP_TIME|DEP_TIME|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|CANCELLED|DISTANCE|
+--------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
|1/1/2000|        DL|             1451|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1115|    1113|     1343|      5|        1400|    1348|        0|     946|
|1/1/2000|        DL|             1479|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1315|    1311|     1536|      7|        1559|    1543|        0|     946|
|1/1/2000|        DL|             1857|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1415|    1414|     1642|      9|        1721|    1651|        0|     946|
|1/1/2000|

In [0]:
flight_time_raw_df.printSchema()

root
 |-- FL_DATE: string (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: integer (nullable = true)
 |-- CANCELLED: string (nullable = true)
 |-- DISTANCE: integer (nullable = true)



In [0]:
flight_time_raw_df.show(10)

+--------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
| FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|ORIGIN_CITY_NAME|DEST|DEST_CITY_NAME|CRS_DEP_TIME|DEP_TIME|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|CANCELLED|DISTANCE|
+--------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
|1/1/2000|        DL|             1451|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1115|    1113|     1343|      5|        1400|    1348|        0|     946|
|1/1/2000|        DL|             1479|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1315|    1311|     1536|      7|        1559|    1543|        0|     946|
|1/1/2000|        DL|             1857|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1415|    1414|     1642|      9|        1721|    1651|        0|     946|
|1/1/2000|

In [0]:
import pyspark.sql.functions as F

flight_time_raw_df1 = flight_time_raw_df\
                            .withColumn("FL_DATE_DT", F.to_timestamp(F.col("FL_DATE"), "yyyy-mm-dd"))

flight_time_raw_df1.printSchema()

root
 |-- FL_DATE: string (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: integer (nullable = true)
 |-- CANCELLED: string (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- FL_DATE_DT: timestamp (nullable = true)



In [0]:
from pyspark.sql.functions import to_date, expr

flight_time_df = flight_time_raw_df\
        .withColumn("FL_DATE", to_date("FL_DATE", "M/d/y"))\
            .withColumn("CANCELLED", expr("if(CANCELLED == 1, true, false)"))

In [0]:
flight_time_df.show(10)

+----------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|ORIGIN_CITY_NAME|DEST|DEST_CITY_NAME|CRS_DEP_TIME|DEP_TIME|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|CANCELLED|DISTANCE|
+----------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
|2000-01-01|        DL|             1451|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1115|    1113|     1343|      5|        1400|    1348|    false|     946|
|2000-01-01|        DL|             1479|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1315|    1311|     1536|      7|        1559|    1543|    false|     946|
|2000-01-01|        DL|             1857|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1415|    1414|     1642|      9|        1721|    1651|    false|     946

In [0]:
flight_time_df.write\
    .format("parquet")\
        .mode("overwrite")\
            .saveAsTable("flight_time_tbl")


1. write options: overwrite, append, error or errorifexists, ignore
2. Spark tables dont impose any PK constraints
3. Append and overwrite modes also create the table if it doesn't exist.
4. Ignore if the table is already there. If doesnt exist then write.

In [0]:
%sql

describe extended flight_time_tbl

col_name,data_type,comment
FL_DATE,date,
OP_CARRIER,string,
OP_CARRIER_FL_NUM,int,
ORIGIN,string,
ORIGIN_CITY_NAME,string,
DEST,string,
DEST_CITY_NAME,string,
CRS_DEP_TIME,int,
DEP_TIME,int,
WHEELS_ON,int,
