#Chapter 2 - Introduction to Spark

In [None]:
myRange = spark.range(1000).toDF("number")


In [None]:
divisBy2 = myRange.where("number % 2 = 0")


In [None]:
divisBy2 = myRange.where("number % 2 = 0").show()

In [None]:
divisBy2.count()


In [None]:
flightData2015 = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv("dbfs:/FileStore/2015_summary.csv")

In [None]:
flightData2015.take(3)

In [None]:
flightData2015.sort("count").explain()

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [None]:
flightData2015.sort("count").take(2)

In [None]:
#make any DataFrame into a table or view with one simple method call:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [None]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.count()

sqlWay.explain()
dataFrameWay.explain()



In [None]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)

In [None]:
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

In [None]:
#the top five destination countries in the data.Multitransformation query

maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()

In [None]:
from pyspark.sql.functions import desc
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()

In [None]:
from pyspark.sql.functions import desc
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.explain()

#Chapter 4 - Structured API Overview

In [None]:
df = spark.range(500).toDF("number")
df.select(df["number"] + 10)

In [None]:
#rows
spark.range(2).collect()

In [None]:
from pyspark.sql.types import *
b = ByteType()

# Chapter 5 - Basic Structured Operations

In [None]:
df = spark.read.format("json").load("dbfs:/FileStore/2015_summary.json")

In [None]:
df.printSchema()

## Schemas

In [None]:
spark.read.format("json").load("dbfs:/FileStore/2015_summary.json").schema

In [None]:
#myManualSchema
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", LongType(), False, metadata={"hello":"world"})])

df = spark.read.format("json").schema(myManualSchema)\
.load("dbfs:/FileStore/2015_summary.json")

## Columns and Expressions

In [None]:
from pyspark.sql.functions import col, column
col("someColumnName")
column("someColumnName")

In [None]:
spark.read.format("json").load("dbfs:/FileStore/2015_summary.json").columns

## Expressions

In [None]:
(((col("someCol") + 5) * 200) - 6) < col("otherCol")

In [None]:
from pyspark.sql.functions import expr
expr("(((someCol + 5) * 200) - 6) < otherCol")

## Records and Rows

In [None]:
df.first()

In [None]:
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)

In [None]:

print(myRow[0])
myRow[2]


## DataFrame Transformations

In [None]:
df = spark.read.format("json").load("dbfs:/FileStore/2015_summary.json")
df.createOrReplaceTempView("dfTable")

In [None]:
#my Df
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("some", StringType(), True),
StructField("col", StringType(), True),
StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()

## select and selectExpr

In [None]:
df.select("DEST_COUNTRY_NAME").show(2)

In [None]:
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

In [None]:
from pyspark.sql.functions import expr, col, column
df.select(
expr("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"))\
.show(2)


In [None]:
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

In [None]:
df.select(expr("DEST_COUNTRY_NAME as destination"))\
.show(2)

In [None]:
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME"))\
.show(2)

In [None]:
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)

In [None]:
df.selectExpr(
"*", 
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
.show(2)

In [None]:
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

In [None]:
from pyspark.sql.functions import lit
df.select(expr("*"), lit(1).alias("One")).show(2)

## Adding Columns

In [None]:
df.withColumn("numberOne", lit(1)).show(2)

In [None]:
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
.show(2)

In [None]:
df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns

## Renaming Columns

In [None]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

## Reserved Characters and Keywords

In [None]:
dfWithLongColName = df.withColumn(
"This Long Column-Name",
expr("ORIGIN_COUNTRY_NAME"))

In [None]:
dfWithLongColName.selectExpr(
"`This Long Column-Name`",
"`This Long Column-Name` as `new col`")\
.show(2)
dfWithLongColName.createOrReplaceTempView("dfTableLong")

In [None]:
dfWithLongColName.select(expr("`This Long Column-Name`")).columns

## Removing Columns

In [None]:
df.drop("ORIGIN_COUNTRY_NAME").columns

In [None]:
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")

## Changing a Column’s Type (cast)

In [None]:
df.withColumn("count2", col("count").cast("long"))

## Filtering Rows

In [None]:
df.filter(col("count") < 2).show(2)

In [None]:
df.where("count < 2").show(2)

In [None]:
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
.show(2)

## Getting Unique Rows

In [None]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

In [None]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()

## Random Samples

In [None]:
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

## Random Splits

In [None]:
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count() # False

## Concatenating and Appending Rows (Union)

In [None]:
from pyspark.sql import Row
schema = df.schema
newRows =[Row("New Country", "Other Country", 5), Row("New Country 2", "Other Country 3", 1)]

In [None]:
parallelizedRows = spark.sparkContext.parallelize(newRows)


In [None]:
newDF = spark.createDataFrame(parallelizedRows, schema)

df.union(newDF)\
.where("count = 1")\
.where(col("ORIGIN_COUNTRY_NAME") != "United States")\
.show()

## Sorting Rows

In [None]:
df.sort("count").show(5)


In [None]:
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

In [None]:
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)


In [None]:
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)

In [None]:
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

## Limit

In [None]:
df.limit(5).show()

In [None]:
df.orderBy(expr("count desc")).limit(6).show()

## Repartition and Coalesce

In [None]:
df.rdd.getNumPartitions() # 1

In [None]:
df.repartition(5).show()

In [None]:
df.repartition(col("DEST_COUNTRY_NAME")).show()

In [None]:
df.repartition(5, col("DEST_COUNTRY_NAME")).show()

In [None]:
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

## Collecting Rows to the Driver

In [None]:
collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count


In [None]:
collectDF.show() # this prints it out nicely


In [None]:
collectDF.show(5, False)


In [None]:
collectDF.collect()

In [None]:
collectDF.toLocalIterator()

# Chapter 7 - Aggregations

In [None]:
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("dbfs:/FileStore/regu-datasets/online_retail_dataset.csv")\
.coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")

## count

In [None]:
from pyspark.sql.functions import count
df.select(count("StockCode")).show()

In [None]:
from pyspark.sql.functions import countDistinct
df.select(countDistinct("StockCode")).show()

In [None]:
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode",0.1)).show()

##first and last

In [None]:
from pyspark.sql.functions import first, last
df.select(first("StockCode"), last("StockCode")).show()

## min and max

In [None]:
from pyspark.sql.functions import min, max
df.select(min("Quantity"), max("Quantity")).show()

## sum

In [None]:
from pyspark.sql.functions import sum
df.select(sum("Quantity")).show()

In [None]:
from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("Quantity")).show()

In [None]:
from pyspark.sql.functions import sum, count, avg, expr

df.select(
count("Quantity").alias("total_transactions"),
sum("Quantity").alias("total_purchases"),
avg("Quantity").alias("avg_purchases"),
expr("mean(Quantity)").alias("mean_purchases"))\
.selectExpr(
"total_purchases/total_transactions",
"avg_purchases",
"mean_purchases").show()

## Variance and Standard Deviation

In [None]:
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp
df.select(var_pop("Quantity"), var_samp("Quantity"),
stddev_pop("Quantity"), stddev_samp("Quantity")).show()

##grouping

In [None]:
df.groupBy("InvoiceNo", "CustomerId").count().show(3)

In [None]:
from pyspark.sql.functions import count
df.groupBy("InvoiceNo").agg(
count("Quantity").alias("quan"),
expr("count(Quantity)")).show()

In [None]:
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)"))\
.show()

## Window Functions

In [None]:
from pyspark.sql.functions import col, to_date
dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
windowSpec = Window\
.partitionBy("CustomerId", "date")\
.orderBy(desc("Quantity"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [None]:
from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [None]:
from pyspark.sql.functions import dense_rank, rank
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

In [None]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")


In [None]:
from pyspark.sql.functions import col
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
.select(
col("CustomerId"),
col("date"),
col("Quantity"),
purchaseRank.alias("quantityRank"),
purchaseDenseRank.alias("quantityDenseRank"),
maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()

# Chapter 8 - Joins

In [None]:
person = spark.createDataFrame([
(0, "Bill Chambers", 0, [100]),
(1, "Matei Zaharia", 1, [500, 250, 100]),
(2, "Michael Armbrust", 1, [250, 100])])\
.toDF("id", "name", "graduate_program", "spark_status")

graduateProgram = spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")

sparkStatus = spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")])\
.toDF("id", "status")

In [None]:
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")

## Inner Joins

In [None]:
joinExpression = person["graduate_program"] == graduateProgram['id']

In [None]:
person.join(graduateProgram, joinExpression).show()

In [None]:
person.join(graduateProgram, joinExpression).show()

## Outer Joins"

In [None]:
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show()

In [None]:
joinType = "left_outer"

graduateProgram.join(person,joinExpression,joinType).show()

In [None]:
joinType ="right_outer"

person.join(graduateProgram , joinExpression , joinType).show()

## Semi Joins

In [None]:
joinType = "left_semi"
graduateProgram.join(person, joinExpression, joinType).show()

In [None]:
gradProgram2 = graduateProgram.union(spark.createDataFrame([
(0, "Masters", "Duplicated Row", "Duplicated School")]))
gradProgram2.createOrReplaceTempView("gradProgram2")
gradProgram2.join(person, joinExpression, joinType).show()

In [None]:
joinType = "left_anti"
graduateProgram.join(person, joinExpression, joinType).show()

##Cross Joins

In [None]:
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()

In [None]:
person.crossJoin(graduateProgram).show()

##Handling Duplicate Column Names

In [None]:
gradProgramDupe = graduateProgram.withColumnRenamed("id", "graduate_program")

In [None]:
joinExpr = gradProgramDupe["graduate_program"] == person["graduate_program"]

In [None]:

person.join(gradProgramDupe, joinExpr).show()

In [None]:
person.join(gradProgramDupe,"graduate_program").select("graduate_program").show()

In [None]:
person.join(gradProgramDupe,joinExpr).drop(person["graduate_program"]).select("graduate_program").show()

## How Spark Performs Joins

In [None]:
joinExpr = person["graduate_program"] == graduateProgram["id"]
person.join(graduateProgram, joinExpr).explain()

In [None]:
from pyspark.sql.functions import broadcast

person.join(broadcast(graduateProgram), joinExpr).explain()

# Chapter 9 Data Sources

In [None]:
read_data = spark.read.format("csv")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.option("path", "dbfs:/FileStore/regu-datasets/housing.csv")\
.load()

In [None]:
read_data.show()

In [None]:
spark.read.format("csv")\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("dbfs:/FileStore/regu-datasets/housing.csv")

In [None]:
csvFile = spark.read.format("csv")\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("dbfs:/FileStore/regu-datasets/housing.csv")

In [None]:
csvFile.write.format("csv").mode("overwrite").option("sep", "\t")\
.save("dbfs:/FileStore/regu-datasets/housing_tsv.tsv")

In [None]:
spark.read.format("json").option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("dbfs:/FileStore/2015_summary.json").show(5)

In [None]:
csvFile.write.format("json").mode("overwrite").save("dbfs:/FileStore/regu-datasets/my-json-file.json")

In [None]:
csvFile.write.format("parquet").mode("overwrite").save("dbfs:/FileStore/regu-datasets/2010-summary.parquet")

In [None]:
spark.read.format("parquet")\
.load("dbfs:/FileStore/regu-datasets/2010-summary.parquet").show(5)

In [None]:
csvFile.write.format("parquet").mode("overwrite")\
.save("dbfs:/FileStore/regu-datasets/my-parquet-file.parquet")

In [None]:
csvFile = spark.read.format("json").option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("dbfs:/FileStore/2015_summary.json")

In [None]:
csvFile.limit(10).write.mode("overwrite").partitionBy("DEST_COUNTRY_NAME")\
.save("dbfs:/FileStore/regu-datasets/partitioned-files.parquet")

In [None]:
csvFile = spark.read.format("json").option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("dbfs:/FileStore/2015_summary.json")
csvFile.limit(10).write.option("maxRecordsPerFile",500).mode("overwrite").partitionBy("DEST_COUNTRY_NAME")\
.save("dbfs:/FileStore/regu-datasets/maxRecordsPerFile-partitioned-files.parquet")

#Chapter 10 - Spark SQL

In [None]:
spark.sql("SELECT 1 + 1").show()

In [None]:
spark.read.json("dbfs:/FileStore/2015_summary.json")\
.createOrReplaceTempView("some_sql_view")

In [None]:
spark.sql("""
SELECT DEST_COUNTRY_NAME , sum(count) FROM some_sql_view GROUP BY DEST_COUNTRY_NAME""")\
.where("DEST_COUNTRY_NAME LIKE 'S%'").where("`sum(count)` > 10")\
.count() 

In [None]:
%sql
CREATE TABLE flights1 (
DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
USING JSON OPTIONS (path 'dbfs:/FileStore/2015_summary.json')

In [None]:
%sql
CREATE TABLE flights_csv (
DEST_COUNTRY_NAME STRING,
ORIGIN_COUNTRY_NAME STRING COMMENT "remember, the US will be most prevalent",
count LONG)
USING csv OPTIONS (header true, path 'dbfs:/FileStore/2015_summary.json')

In [None]:
%sql
CREATE TABLE flights_from_select1 USING parquet AS SELECT * FROM flights1

In [None]:
%sql
CREATE TABLE partitioned_flights1 USING parquet PARTITIONED BY (DEST_COUNTRY_NAME)
AS SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count FROM flights1 LIMIT 5

## Creating External Tables

In [None]:
%sql
CREATE EXTERNAL TABLE hive_flights (
DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'dbfs:/data/flight-data-hive'

## Inserting into Tables

In [None]:
%sql
INSERT INTO flights_from_select 
SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME , count FROM flights1 LIMIT 20

In [None]:
%sql
INSERT INTO partitioned_flights
PARTITION (DEST_COUNTRY_NAME="UNITED STATES")
SELECT ORIGIN_COUNTRY_NAME,count  FROM flights
WHERE DEST_COUNTRY_NAME='UNITED STATES' LIMIT 12

##Describing and Refreshing Table Metadata

In [None]:
%sql
DESCRIBE TABLE flights_csv

col_name,data_type,comment
DEST_COUNTRY_NAME,string,
ORIGIN_COUNTRY_NAME,string,"remember, the US will be most prevalent"
count,bigint,


In [None]:
%sql 
SHOW PARTITIONS partitioned_flights

partition
DEST_COUNTRY_NAME=Afghanistan
DEST_COUNTRY_NAME=Egypt
DEST_COUNTRY_NAME=Equatorial Guinea
DEST_COUNTRY_NAME=Federated States of Micronesia
DEST_COUNTRY_NAME=Grenada
DEST_COUNTRY_NAME=India
DEST_COUNTRY_NAME=Ireland
DEST_COUNTRY_NAME=Marshall Islands
DEST_COUNTRY_NAME=Netherlands
DEST_COUNTRY_NAME=Romania


In [None]:
%sql 
REFRESH table partitioned_flights

In [None]:
%sql
MSCK REPAIR TABLE partitioned_flights

##Dropping Tables

In [None]:
%sql
DROP TABLE IF EXISTS flights_csv;

##Caching Tables

In [None]:
%sql 
CACHE TABLE flights

In [None]:
%sql
UNCACHE TABLE FLIGHTS

##Views

In [None]:
%sql

CREATE VIEW just_usa_view AS
SELECT * FROM flights WHERE dest_country_name = 'United States'

In [None]:
%sql 
CREATE TEMP VIEW just_usa_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United States'

In [None]:
%sql 
CREATE GLOBAL TEMP VIEW just_usa_global_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United States'

In [None]:
%sql show tables

database,tableName,isTemporary
default,bucketedfiles,False
default,city_sm,False
default,flight_summary_2015,False
default,flights,False
default,flights1,False
default,flights_2,False
default,flights_3,False
default,flights_from_select,False
default,flights_from_select1,False
default,flights_new,False


In [None]:
%sql CREATE OR REPLACE TEMP VIEW just_usa_view_temp AS
SELECT * FROM flights WHERE dest_country_name = 'United States'

In [None]:
%sql 
SELECT * FROM just_usa_view_temp

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
United States,India,62
United States,Singapore,1
United States,Grenada,62
United States,Sint Maarten,325
United States,Marshall Islands,39
United States,Paraguay,6
United States,Gibraltar,1


In [None]:
%sql DROP VIEW IF EXISTS just_usa_view;

##Databases

In [None]:
%sql SHOW DATABASES

databaseName
default


In [None]:
%sql
CREATE DATABASE some_db

In [None]:
%sql show databases

databaseName
default
some_db


In [None]:
%sql USE some_db

In [None]:
%sql show tables

database,tableName,isTemporary
,dftable,True
,dftablelong,True
,dfwithdate,True
,flight_data_2015,True
,gradprogram2,True
,graduateprogram,True
,just_usa_view_temp,True
,person,True
,some_sql_view,True
,sparkstatus,True


In [None]:
%sql SELECT * FROM default.flights

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [None]:
%sql select current_database()

current_database()
some_db


In [None]:
%sql use default

In [None]:
%sql DROP DATABASE IF EXISTS some_db 

In [None]:
%sql 
show databases

databaseName
default


##Select statements

In [None]:
%sql 
SELECT * FROM flights

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [None]:
%sql 
SELECT * FROM flights where DEST_COUNTRY_NAME = 'United States' order by ORIGIN_COUNTRY_NAME LIMIT 20

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Angola,13
United States,Anguilla,38
United States,Antigua and Barbuda,117
United States,Argentina,141
United States,Aruba,342
United States,Australia,258
United States,Austria,63
United States,Azerbaijan,21
United States,Bahrain,1
United States,Barbados,130


In [None]:
%sql
SELECT
CASE WHEN DEST_COUNTRY_NAME = 'UNITED STATES' THEN 1
WHEN DEST_COUNTRY_NAME = 'Egypt' THEN 0
ELSE -1 END 
FROM partitioned_flights

CASE WHEN (DEST_COUNTRY_NAME = UNITED STATES) THEN 1 WHEN (DEST_COUNTRY_NAME = Egypt) THEN 0 ELSE -1 END
-1
0
-1
-1
-1
-1
-1
-1
-1
-1


## Structs and Lists

In [None]:
%sql CREATE VIEW IF NOT EXISTS nested_data AS
SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count FROM flights

In [None]:
%sql SELECT * FROM nested_data

country,count
"List(United States, Romania)",15
"List(United States, Croatia)",1
"List(United States, Ireland)",344
"List(Egypt, United States)",15
"List(United States, India)",62
"List(United States, Singapore)",1
"List(United States, Grenada)",62
"List(Costa Rica, United States)",588
"List(Senegal, United States)",40
"List(Moldova, United States)",1


In [None]:
%sql SELECT country.DEST_COUNTRY_NAME, count FROM nested_data

DEST_COUNTRY_NAME,count
United States,15
United States,1
United States,344
Egypt,15
United States,62
United States,1
United States,62
Costa Rica,588
Senegal,40
Moldova,1


In [None]:
%sql SELECT country.*, count FROM nested_data

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [None]:
%sql SELECT DEST_COUNTRY_NAME as new_name, collect_list(count) as flight_counts,
collect_set(ORIGIN_COUNTRY_NAME) as origin_set
FROM flights GROUP BY DEST_COUNTRY_NAME

new_name,flight_counts,origin_set
Algeria,List(4),List(United States)
Angola,List(15),List(United States)
Anguilla,List(41),List(United States)
Antigua and Barbuda,List(126),List(United States)
Argentina,List(180),List(United States)
Aruba,List(346),List(United States)
Australia,List(329),List(United States)
Austria,List(62),List(United States)
Azerbaijan,List(21),List(United States)
Bahrain,List(19),List(United States)


In [None]:
%sql SELECT DEST_COUNTRY_NAME, ARRAY(1, 2, 3) FROM flights

DEST_COUNTRY_NAME,"array(1, 2, 3)"
United States,"List(1, 2, 3)"
United States,"List(1, 2, 3)"
United States,"List(1, 2, 3)"
Egypt,"List(1, 2, 3)"
United States,"List(1, 2, 3)"
United States,"List(1, 2, 3)"
United States,"List(1, 2, 3)"
Costa Rica,"List(1, 2, 3)"
Senegal,"List(1, 2, 3)"
Moldova,"List(1, 2, 3)"


In [None]:
%sql SELECT DEST_COUNTRY_NAME as new_name, collect_list(count)[0]
FROM flights GROUP BY DEST_COUNTRY_NAME

new_name,collect_list(count)[0]
Algeria,4
Angola,15
Anguilla,41
Antigua and Barbuda,126
Argentina,180
Aruba,346
Australia,329
Austria,62
Azerbaijan,21
Bahrain,19


In [None]:
%sql
CREATE OR REPLACE TEMP VIEW flights_agg AS
SELECT DEST_COUNTRY_NAME, collect_list(count) as collected_counts
FROM flights GROUP BY DEST_COUNTRY_NAME

In [None]:
%sql SELECT explode(collected_counts), DEST_COUNTRY_NAME FROM flights_agg

col,DEST_COUNTRY_NAME
4,Algeria
15,Angola
41,Anguilla
126,Antigua and Barbuda
180,Argentina
346,Aruba
329,Australia
62,Austria
21,Azerbaijan
19,Bahrain


## Functions

In [None]:
%sql SHOW FUNCTIONS

function
!
!=
%
&
*
+
-
/
<
<=


In [None]:
%sql SHOW SYSTEM FUNCTIONS

function
!
!=
%
&
*
+
-
/
<
<=


In [None]:
%sql SHOW USER FUNCTIONS

function
getargument


In [None]:
%sql SHOW FUNCTIONS LIKE "collect*";

function
collect_list
collect_set


##Subqueries

In [None]:
%sql SELECT dest_country_name FROM flights
GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5

dest_country_name
United States
Canada
Mexico
United Kingdom
Japan


In [None]:
%sql SELECT * FROM flights
WHERE origin_country_name IN (SELECT dest_country_name FROM flights
GROUP BY dest_country_name ORDER BY sum(count) DESC LIMIT 5)

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
Egypt,United States,15
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1
Guyana,United States,64
Malta,United States,1
Anguilla,United States,41
Bolivia,United States,30
Algeria,United States,4
Turks and Caicos Islands,United States,230


In [None]:
%sql SELECT * FROM flights f1
WHERE EXISTS (SELECT 1 FROM flights f2
WHERE f1.dest_country_name = f2.origin_country_name)
AND EXISTS (SELECT 1 FROM flights f2
WHERE f2.dest_country_name = f1.origin_country_name)

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
United States,Sint Maarten,325


In [None]:
%sql SELECT *, (SELECT max(count) FROM flights) AS maximum FROM flights

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count,maximum
United States,Romania,15,370002
United States,Croatia,1,370002
United States,Ireland,344,370002
Egypt,United States,15,370002
United States,India,62,370002
United States,Singapore,1,370002
United States,Grenada,62,370002
Costa Rica,United States,588,370002
Senegal,United States,40,370002
Moldova,United States,1,370002


## Setting Configuration Values in SQL

In [None]:
%sql SET spark.sql.shuffle.partitions=20

key,value
spark.sql.shuffle.partitions,20


In [None]:
%sql SET spark.sql.inMemoryColumnarStorage.compressed = true

key,value
spark.sql.inMemoryColumnarStorage.compressed,True
