# connect to the cluster

In [None]:
%run connect-to-spark-container.ipynb

In [None]:
sc

# examine the data

In [None]:
!ls /data

In [None]:
!ls /data/flight-data

In [None]:
!ls /data/flight-data/csv/

In [None]:
!cat /data/flight-data/csv/2010-summary.csv

In [None]:
!head /data/flight-data/csv/2010-summary.csv

# create the dataframe

In [None]:
flightDF = spark.read.option("header", "true").csv("file:///data/flight-data/csv/2010-summary.csv")

In [None]:
flightDF.show()

In [None]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [None]:
flightDF

## examine structure and metadata

In [None]:
flightDF.columns

In [None]:
flightDF.explain

In [None]:
flightDF.printSchema()

In [None]:
flightDF.describe()

In [None]:
flightDF.summary()

In [None]:
flightDF.limit(10)

In [None]:
flightDF.collect()

## select

In [None]:
flightDF.select("DEST_COUNTRY_NAME", "count")

In [None]:
flightDF.select(flightDF.columns[0]) # OR
flightDF.select(flightDF[0])

In [None]:
# distinct
flightDF.select("DEST_COUNTRY_NAME").distinct()

In [None]:
# sort
flightDF.select("DEST_COUNTRY_NAME").distinct().sort("DEST_COUNTRY_NAME")

## delete

In [None]:
new_flight_df = flightDF.drop("ORIGIN_COUNTRY_NAME")

## rename column

In [None]:
flightDF.withColumnRenamed("DEST_COUNTRY_NAME", "Destination").withColumnRenamed("ORIGIN_COUNTRY_NAME", "Origin").show(10)

## drop nulls

In [None]:
flightDF.dropna('any') # any or all

## drop duplicates

In [109]:
flightDF.dropDuplicates().show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Jordan|      United States|   50|
|          Denmark|      United States|   98|
+-----------------+-------------------+-----+
only showing top 2 rows



## SQL functions

In [None]:
import pyspark.sql.functions as f

## add columns

In [None]:
flightDF.withColumn("desc", f.lit("flight"))

In [None]:
flightDF.withColumn("Odd or Even", f.when(f.col("count")%2 == 0, "even").otherwise("odd")).show(20, False)

## split columns

In [None]:
flightDF.columns

In [None]:
flightDF.select(f.split(f.col("DEST_COUNTRY_NAME"), " ").getItem(0).alias("first"), \
                f.split(f.col("DEST_COUNTRY_NAME"), " ").getItem(1).alias("second"))

In [None]:
flightDF

# stop all

In [None]:
%run stop-all.ipynb