In [0]:
sc.version

In [0]:
spark.catalog.listTables()

In [0]:
from pyspark.sql import SparkSession 
my_spark=SparkSession.builder.getOrCreate()
print(my_spark)

In [0]:
query="FROM flights SELECT * LIMIT 10"
flights10=spark.sql(query)
flights10.show()

In [0]:
query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"
flight_counts = spark.sql(query)
pd_counts = flight_counts.toPandas()
print(pd_counts.head())

In [0]:
import pandas as pd
import numpy as np
pd_temp = pd.DataFrame(np.random.random(10))
print(pd_temp)

#Creating Spark DF
spark_temp=spark.createDataFrame(pd_temp)
print(spark_temp)

#List of tables
print(spark.catalog.listTables())

#create temp tables
spark_temp.createOrReplaceTempView("temp")

In [0]:
spark.catalog.listTables()
airports=spark.read.csv("/FileStore/tables/airports.csv",header=True)
airports.show()

In [0]:
flights=spark.table("flights")
flights.show()

#Add column
flights=flights.withColumn("duration_hrs",flights.air_time/60)
flights.show()

In [0]:
long_flights1=flights.distance>1000
long_flights1=flights.filter(long_flights1)
long_flights1.show()

long_flights2=flights.filter(flights.distance>1000)
long_flights2.show()

In [0]:
selected1=flights.select("tailnum","origin","dest")
selected1.show()

selected2=flights.select("origin","dest","carrier").filter(flights.origin=="SEA").filter(flights.dest=="PDX")
selected2.show()



In [0]:

flights=flights.withColumn("avg_speed",flights.distance/flights.duration_hrs)
flights.show()



In [0]:
from pyspark.sql.types import IntegerType

flights=flights.withColumn("distance",flights["distance"].cast(IntegerType()))
flights.filter(flights.origin=="PDX").groupBy().min("distance").show()

flights=flights.withColumn("air_time",flights["air_time"].cast(IntegerType()))
flights.filter(flights.origin=="SEA").groupBy().max("air_time").show()



In [0]:
flights.filter(flights.carrier=="DL").groupBy().avg("air_time").alias("avg_air_time").show()

flights.groupBy().sum("air_time").show()

In [0]:
by_plane=flights.groupBy("tailnum").count().show()
by_origin=flights.groupBy("origin").count().show()

flights.filter(flights.origin=="PDX").filter(flights.dest=="SEA").groupBy().avg("air_time").show()

In [0]:
import pyspark.sql.functions as F

flights=flights.withColumn("dep_delay",flights["dep_delay"].cast(IntegerType()))
by_month_dest=flights.groupBy("month","dest")
by_month_dest.avg("dep_delay").show()
by_month_dest.agg(F.stddev("dep_delay")).show()

In [0]:
spark.catalog.listTables()
#airports.show()
airports=airports.withColumnRenamed("faa","dest")

flights_with_airports = flights.join(airports, on="dest", how="leftouter")
flights_with_airports.show()

In [0]:
planes=spark.read.csv("/FileStore/tables/planes.csv",header=True)
#planes.show()

planes=planes.withColumnRenamed("year","plane_year")
#planes.show()

model_data = flights.join(planes, on="tailnum", how="leftouter")

#model_data.show()


In [0]:
model_data=model_data.withColumn("arr_delay",model_data["arr_delay"].cast(IntegerType()))

model_data=model_data.withColumn("air_time",model_data["air_time"].cast(IntegerType()))

model_data=model_data.withColumn("month",model_data["month"].cast(IntegerType()))

model_data=model_data.withColumn("plane_year",model_data["plane_year"].cast(IntegerType()))

model_data.show()

In [0]:
model_data=model_data.withColumn("plane_age", model_data.plane_year-model_data.year)

#model_data.show()



In [0]:
model_data=model_data.withColumn("is_late",model_data.arr_delay > 0)

#model_data.show()

model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))

model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

In [0]:
import pyspark.ml.feature as f
carr_indexer = f.StringIndexer(inputCol="carrier", outputCol="carrier_index")
carr_encoder = f.OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")



In [0]:
dest_indexer = f.StringIndexer(inputCol="dest", outputCol="dest_index")
dest_encoder = f.OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")

In [0]:
vec_assembler = f.VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

In [0]:
from pyspark.ml import Pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

In [0]:
piped_data=flights_pipe.fit(model_data).transform(model_data)



In [0]:
training, test=piped_data.randomSplit([0.6,0.4])

In [0]:
training, test=piped_data.randomSplit([0.6,0.4])

In [0]:
from pyspark.ml.classification import LogisticRegression
lr=LogisticRegression()

In [0]:
import pyspark.ml.evaluation as evals
evaluator=evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

In [0]:
import pyspark.ml.tuning as tune
import numpy as np

grid = tune.ParamGridBuilder()
grid = grid.addGrid(lr.regParam, np.arange(0, 0.1, 0.01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1]) 
grid = grid.build()

In [0]:
cv=tune.CrossValidator(estimator=lr,estimatorParamMaps=grid,evaluator=evaluator)

In [0]:
models = cv.fit(training)
best_lr = models.bestModel 
best_lr = lr.fit(training)

print(best_lr)

In [0]:
test_results = best_lr.transform(test)
print(evaluator.evaluate(test_results)) 