## Spark CSV to ORC Conversion

In [1]:
import os, sys
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

In [2]:
# Create a SparkSession. No need to create SparkContext
# You automatically get it as part of the SparkSession
spark = SparkSession.builder\
                .master("local[*]")\
                .appName("ETL")\
                .config('spark.driver.extraClassPath','D:\conn-mysql\mysql-connector-java-5.1.46\mysql-connector-java-5.1.46.jar')\
                .getOrCreate()

# there are some config you might want to set:
# https://spark.apache.org/docs/latest/configuration.html
# now we can go to http://localhost:4040 (default port) in order to see Spark's web UI

In [3]:
print(os.getcwd()) # show current working directory



C:\Users\me\Documents\GitHub\ETL-for-Airline-Analytics


### Adding files to dataframe

In [4]:
df_airline = spark.read \
    .format('com.databricks.spark.csv') \
    .options(header='true', delimiter=',') \
    .load('D:\\Fall-18\\Project-Data\\data\\1987.csv')
df_airline.count()

1311826

In [5]:
df_airline.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (nullable = true)
 |-- CarrierDelay:

In [7]:
subset_df=df_airline.select('DayOfWeek','Distance','WeatherDelay').take(500)

### Creating temp table view for sql querying

In [6]:
df_airline.createOrReplaceTempView("mytempTable") 

In [7]:
spark.sql("create table mytable as select * from mytempTable");

### Writing in ORC fromat

In [9]:
op=spark.sql("select * from mytable limit 500");
op.write.format("orc").save("D:\conn-mysql\mysql-connector-java-5.1.46\Airline")

In [10]:
orc_df = spark.read.orc("D:\conn-mysql\mysql-connector-java-5.1.46\Airline")

In [11]:
orc_df.take(5)

[Row(Year='1987', Month='10', DayofMonth='14', DayOfWeek='3', DepTime='741', CRSDepTime='730', ArrTime='912', CRSArrTime='849', UniqueCarrier='PS', FlightNum='1451', TailNum='NA', ActualElapsedTime='91', CRSElapsedTime='79', AirTime='NA', ArrDelay='23', DepDelay='11', Origin='SAN', Dest='SFO', Distance='447', TaxiIn='NA', TaxiOut='NA', Cancelled='0', CancellationCode='NA', Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Year='1987', Month='10', DayofMonth='15', DayOfWeek='4', DepTime='729', CRSDepTime='730', ArrTime='903', CRSArrTime='849', UniqueCarrier='PS', FlightNum='1451', TailNum='NA', ActualElapsedTime='94', CRSElapsedTime='79', AirTime='NA', ArrDelay='14', DepDelay='-1', Origin='SAN', Dest='SFO', Distance='447', TaxiIn='NA', TaxiOut='NA', Cancelled='0', CancellationCode='NA', Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Year='1987', Month=