In [1]:
from pyspark.sql import SparkSession
from os.path import join, abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row


warehouse_location = abspath('spark-warehouse')



In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()



In [3]:
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")



DataFrame[]

In [7]:
spark.sql("""CREATE TABLE us_delay_flights_tbl(date STRING, delay INT,
distance INT, origin STRING, destination STRING)
USING csv OPTIONS (PATH
'./data/departuredelays.csv')""")

AnalysisException: Table default.us_delay_flights_tbl already exists.;

In [12]:
df_sfo = spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin = 'SFO'")
df_jfk = spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin = 'JFK'")

In [13]:
df_sfo.createOrReplaceGlobalTempView("us_origin_airport_SFO_global_tmp_view")
df_jfk.createOrReplaceTempView("us_origin_airport_JFK_tmp_view")

In [16]:
spark.read.table("us_origin_airport_JFK_tmp_view").show(10)
spark.sql("SELECT * FROM us_origin_airport_JFK_tmp_view").show(10)

+--------+-----+------+-----------+
|    date|delay|origin|destination|
+--------+-----+------+-----------+
|01010900|   14|   JFK|        LAX|
|01011200|   -3|   JFK|        LAX|
|01011900|    2|   JFK|        LAX|
|01011700|   11|   JFK|        LAS|
|01010800|   -1|   JFK|        SFO|
|01011540|   -4|   JFK|        DFW|
|01011705|    5|   JFK|        SAN|
|01011530|   -3|   JFK|        SFO|
|01011630|   -3|   JFK|        SJU|
|01011345|    2|   JFK|        LAX|
+--------+-----+------+-----------+
only showing top 10 rows

+--------+-----+------+-----------+
|    date|delay|origin|destination|
+--------+-----+------+-----------+
|01010900|   14|   JFK|        LAX|
|01011200|   -3|   JFK|        LAX|
|01011900|    2|   JFK|        LAX|
|01011700|   11|   JFK|        LAS|
|01010800|   -1|   JFK|        SFO|
|01011540|   -4|   JFK|        DFW|
|01011705|    5|   JFK|        SAN|
|01011530|   -3|   JFK|        SFO|
|01011630|   -3|   JFK|        SJU|
|01011345|    2|   JFK|        LAX|
+-

In [17]:
#drop views
spark.catalog.dropGlobalTempView("us_origin_airport_SFO_global_tmp_view")
spark.catalog.dropTempView("us_origin_airport_JFK_tmp_view")


In [21]:
us_flights_df = spark.sql("SELECT * FROM us_delay_flights_tbl")
us_flights_df2 = spark.table("us_delay_flights_tbl")


In [14]:
csv_file = "./data/departuredelays.csv"

In [20]:
#show metadata using catalog
spark.catalog.listDatabases()
spark.catalog.listTables()
spark.catalog.listColumns("us_delay_flights_tbl")

[Column(name='date', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='delay', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='distance', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='origin', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='destination', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]

In [15]:
#some notes about the below scripts in the book
# Read and create a temporary view
# Infer schema (note that for larger files you
# may want to specify the schema)
df = (spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(csv_file))

In [16]:
df.createOrReplaceTempView("us_delay_flights_tbl")

In [17]:
print(df.head())

Row(date=1011245, delay=6, distance=602, origin='ABE', destination='ATL')


In [18]:
spark.sql("""SELECT distance, origin, destination 
FROM us_delay_flights_tbl WHERE distance > 1000
ORDER BY distance DESC""").show(10)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows



In [50]:
spark.sql("""SELECT date, delay, origin, destination
FROM us_delay_flights_tbl
WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD'
ORDER by delay DESC""").show(10)

+-------+-----+------+-----------+
|   date|delay|origin|destination|
+-------+-----+------+-----------+
|2190925| 1638|   SFO|        ORD|
|1031755|  396|   SFO|        ORD|
|1022330|  326|   SFO|        ORD|
|1051205|  320|   SFO|        ORD|
|1190925|  297|   SFO|        ORD|
|2171115|  296|   SFO|        ORD|
|1071040|  279|   SFO|        ORD|
|1051550|  274|   SFO|        ORD|
|3120730|  266|   SFO|        ORD|
|1261104|  258|   SFO|        ORD|
+-------+-----+------+-----------+
only showing top 10 rows



In [52]:
#adding a temprory column with CASE keyword to our temperory view
spark.sql("""SELECT delay, origin, destination,
CASE
WHEN delay > 360 THEN 'Very Long Delays'
WHEN delay > 120 AND delay < 360 THEN 'Long Delays'
WHEN delay > 60 AND delay < 120 THEN 'Short Delays'
WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'
WHEN delay = 0 THEN 'No Delays'
ELSE 'Early'
END AS Flight_Delays
FROM us_delay_flights_tbl
ORDER BY origin, delay DESC""").show(10)

+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|  333|   ABE|        ATL|  Long Delays|
|  305|   ABE|        ATL|  Long Delays|
|  275|   ABE|        ATL|  Long Delays|
|  257|   ABE|        ATL|  Long Delays|
|  247|   ABE|        ATL|  Long Delays|
|  247|   ABE|        DTW|  Long Delays|
|  219|   ABE|        ORD|  Long Delays|
|  211|   ABE|        ATL|  Long Delays|
|  197|   ABE|        DTW|  Long Delays|
|  192|   ABE|        ORD|  Long Delays|
+-----+------+-----------+-------------+
only showing top 10 rows



In [40]:
# we can see that we do not have this column here 
spark.sql('SELECT * FROM us_delay_flights_tbl').show(10)

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
|1030605|    0|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
|1040605|   28|     602|   ABE|        ATL|
|1051245|   88|     602|   ABE|        ATL|
|1050605|    9|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+
only showing top 10 rows



In [41]:
#the exact first query using dataframe api
from pyspark.sql.functions import col, desc
(df.select("distance", "origin", "destination")
.where(col("distance") > 1000)
.orderBy(desc("distance"))).show(10)
# Or
(df.select("distance", "origin", "destination")
.where("distance > 1000")
.orderBy("distance", ascending=False).show(10))

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows



In [62]:
#create a database which create a database in spark-warehouse and the use it 
#spark.sql("CREATE DATABASE learn_spark_db")
spark.sql("USE learn_spark_db")

DataFrame[]

In [63]:
#create a managed table in the database
spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING)")

AnalysisException: Hive support is required to CREATE Hive TABLE (AS SELECT);;
'CreateTable `managed_us_delay_flights_tbl`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, ErrorIfExists


In [59]:
csv_file = "./data/departuredelays.csv"
# Schema as defined in the preceding example
schema="date STRING, delay INT, distance INT, origin STRING, destination STRING"
flights_df = spark.read.csv(csv_file, schema=schema)
flights_df.write.saveAsTable("managed_us_delay_flights_tbl")

In [64]:
spark.sql("""CREATE TABLE us_delay_flights_tbl(date STRING, delay INT,
distance INT, origin STRING, destination STRING)
USING csv OPTIONS (PATH
'./data/departuredelays.csv')""")

AnalysisException: Table learn_spark_db.us_delay_flights_tbl already exists.;

In [74]:

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir",'./ware-house') \
    .enableHiveSupport() \
    .getOrCreate()



In [75]:
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")


AnalysisException: Hive support is required to CREATE Hive TABLE (AS SELECT);;
'CreateTable `learn_spark_db`.`src`, Ignore


In [3]:
#Data Sources for DataFrames and SQL Tables
#data frame    DataFrameReader, DataFrameWriter,

In [23]:
#the most efficient format is parquet, recommend to use
file = "./data/2010-summary.parquet/"
df = spark.read.format("parquet").load(file)

In [27]:
# in sql
spark.sql("""CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl USING parquet
OPTIONS (
path "./data/2010-summary.parquet/" ) """)

spark.sql("SELECT * FROM us_delay_flights_tbl").show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

In [28]:
(df.write.format("parquet")
.mode("overwrite")
.option("compression", "snappy")
.save("./tmp/data/parquet/df_parquet"))

In [29]:
#reading json file as input
file = "./data/json/*"
df = spark.read.format("json").load(file)
#we can also create spark sql using json

In [None]:
#there is possibility to read .csv, Avro which is used by kafka for massage serializing and deserializing, orc, Image
# and other formats based on our needs
