In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
spark

In [5]:
myRange = spark.range(1000).toDF("number")

In [6]:
divisBy2 = myRange.where("number % 2 = 0")

In [7]:
divisBy2.count()

500

In [8]:
flightData2015 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("data/flight-data/csv/2015-summary.csv")

In [9]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [10]:
flightData2015.sort("count").explain()

== Physical Plan ==
*(1) Sort [count#26 ASC NULLS FIRST], true, 0
+- *(1) Project [DEST_COUNTRY_NAME#24, ORIGIN_COUNTRY_NAME#25, count#26]
   +- BatchScan[DEST_COUNTRY_NAME#24, ORIGIN_COUNTRY_NAME#25, count#26] CSVScan Location: InMemoryFileIndex[file:/F:/Jupyter/Books_SparkTheDefinitiveGuide/Spark-The-Definitive-Guide-maste..., ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [11]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [12]:
flightData2015.createOrReplaceTempView("flightData2015")

In [13]:
dataFrameWay = flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .count()

In [14]:
dataFrameWay.explain()

== Physical Plan ==
*(1) HashAggregate(keys=[DEST_COUNTRY_NAME#24], functions=[count(1)])
+- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#24], functions=[partial_count(1)])
   +- *(1) Project [DEST_COUNTRY_NAME#24]
      +- BatchScan[DEST_COUNTRY_NAME#24] CSVScan Location: InMemoryFileIndex[file:/F:/Jupyter/Books_SparkTheDefinitiveGuide/Spark-The-Definitive-Guide-maste..., ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [15]:
from pyspark.sql.functions import max
flightData2015.select(max('count')).take(1)

[Row(max(count)=370002)]

In [16]:
from pyspark.sql.functions import desc, asc
flightData2015.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [17]:
df = spark.read.format("json").load("data/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("dfTable")

In [18]:
df.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [19]:
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [20]:
from pyspark.sql.functions import expr, col, column
df.select(expr("DEST_COUNTRY_NAME"), col("DEST_COUNTRY_NAME"), column("DEST_COUNTRY_NAME")).show(2)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows



In [21]:
df.withColumn("withinCountry",expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [22]:
df.withColumnRenamed("DEST_COUNTRY_NAME","dest").columns

['dest', 'ORIGIN_COUNTRY_NAME', 'count']

In [23]:
df.drop("ORIGIN_COUNTRY_NAME").columns

['DEST_COUNTRY_NAME', 'count']

In [24]:
df.withColumn("count2", col("count").cast("long"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, count2: bigint]

In [25]:
df.filter(col("count")<2).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [26]:
df.where("count<2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [27]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

256

In [28]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()

125

In [29]:
from pyspark.sql import Row

In [30]:
schema = df.schema
newRows = [
    Row('New Country', 'Other Country', 5),
    Row('New Country 2', 'Other Country 3', 1)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)

In [31]:
df.union(newDF).where("count = 1").where(col("ORIGIN_COUNTRY_NAME") != "United States").show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



# Chapter6

DataFrame[5: int, five: string, 5.0: double]