In [0]:
df = spark.read.format("json")\
               .option('mode' , 'FAILFAST')\
                .option('dateFormat' , 'M/d/yyyy')\
                .load(path = '/Volumes/dev/spark_db/datasets/spark_programming/data/flight-time.json')

# mode = PERMISSIVE - > skips the corrupted and keep a track of the data
#      = DROPMALFORMED - > ignores the whole corrupted records
#      = FAILFAST     - > throws an exception when it meets corrupted records.

# dataFormat - > data present in the json file 1/1/2000
#                                              M/d/yyyy

In [0]:
df.printSchema()

#Two problems
#    1) Order of the schema (columns)
#    2) dateFormat - > not able find date type correctly.

In [0]:
df.limit(3).display()

In [0]:
   FL_DATE DATE, 
    OP_CARRIER STRING, 
    OP_CARRIER_FL_NUM STRING, 
    ORIGIN STRING, 
    ORIGIN_CITY_NAME STRING, 
    DEST STRING, 
    DEST_CITY_NAME STRING, 
    CRS_DEP_TIME LONG, 
    DEP_TIME LONG, 
    WHEELS_ON INT, 
    TAXI_IN INT, 
    CRS_ARR_TIME LONG, 
    ARR_TIME LONG, 
    CANCELLED INT, 
    DISTANCE INT

In [0]:
# This will increase performance
# Nomal schema it has check all the data to fix the datatype of all data
# using schema() we will use predefined schema no need to check the data

from pyspark.sql.types import StructType , StructField , StringType , IntegerType , DateType , LongType

flight_schema = StructType(

        [
            StructField (   'FL_DATE' ,  DateType()), 
            StructField (    'OP_CARRIER', StringType()), 
            StructField (    'OP_CARRIER_FL_NUM', StringType()), 
            StructField (    'ORIGIN' ,StringType()), 
            StructField (     "ORIGIN_CITY_NAME" ,StringType()), 
            StructField (     "DEST" ,StringType()),
            StructField (     "DEST_CITY_NAME" ,StringType()),
            StructField (     "CRS_DEP_TIME", LongType()), 
            StructField (     "DEP_TIME", LongType()), 
            StructField (     "WHEELS_ON", IntegerType()), 
            StructField (     "TAXI_IN",  IntegerType()), 
            StructField (     "CRS_ARR_TIME",LongType()), 
            StructField (     "ARR_TIME",LongType()), 
            StructField (     "CANCELLED", IntegerType()), 
            StructField (     "DISTANCE",  IntegerType())
        ]

)

In [0]:
df_on_schema = spark.read.format("json")\
                .option('mode' , 'FAILFAST')\
                .option('dateFormat' , 'M/d/yyyy')\
                .schema(flight_schema)\
                .load(path = '/Volumes/dev/spark_db/datasets/spark_programming/data/flight-time.json')

In [0]:
df_on_schema.printSchema()

In [0]:
df_on_schema.limit(3).display()

In [0]:
df_on_schema.write.mode('overwrite').saveAsTable("dev.spark_db.flight_time_raw")

In [0]:
spark.sql("select * from dev.spark_db.flight_time_raw limit 5").display()