###################################################<br>
File: SparkSQL tutorial <br>
Desc: Introduction to SparkSQL <br>
Auth: Shreenidhi Bharadwaj<br>
Date: 9/29/2019<br>
ALL RIGHTS RESERVED | DO NOT DISTRIBUTE<br>
###################################################

This tutorial is to introduce you to the fundamentals of Spark SQL.
Quick summary of how Spark SQL works:
a) It uses DataFrames, which are collections of data distributed across data nodes.
b) Driver program typically launches the application. Executors, which run on data nodes,
    are responsible for running the actual code.
c) Results are typically returned to the driver.
You will need a SQL context. You create one as follows:
sqlContext = SQLContext(sc) where sc is the SparkContext.

In [2]:
# lets ignore warnings for now
import warnings
warnings.filterwarnings('ignore')

In [3]:
!ps -aef | grep jupyter-lab -m 1 | cut -d ' ' -f 2 | xargs lsof -p | wc -l

       0


In [4]:
# Import Libraries
from pyspark import SparkContext
from pyspark.sql import SQLContext
import os

In [5]:
# Initialize SPARK & SQL Context
sc = SparkContext()
sqlContext = SQLContext(sc)

In [6]:
#A DataFrame is like a relational table. It consists of Row objects
#A row object can be created as follows
from pyspark.sql import Row
cricketer = Row(name='Viv Richards', age = 62, country='WI', batting_average=58.67)

In [7]:
cricketer.name

'Viv Richards'

In [8]:
(cricketer.name, cricketer.age, cricketer.batting_average)

('Viv Richards', 62, 58.67)

In [9]:
cricketer['name'], cricketer['age'], cricketer['batting_average']

('Viv Richards', 62, 58.67)

In [10]:
#we can create a table of cricketers from a Python list of tuples
cricketers = [('Viv Richards', 62, "WI", 58.67), ('Greg Chappell', 64, "AUS", 56.83),\
              ('Doug Walters', 68, "AUS", 48.9), ('VVS Laxman', 41, "IND", 46.7),\
              ('Sachin Tendulkar', 42, 'IND', 57.8), ('Rahul Dravid', 41, 'IND', 54.5),\
              ('Garry Sobers', 78, "WI", 58.9), ('Rohan Kanhai', 75, "WI", 51.3),\
              ('GR Vishwanath', 68, "IND", 44.3)]
cricketers_df = sqlContext.createDataFrame(cricketers, ['Name','Age','Country','Average'])

In [11]:
#use show() to display records; show(n) will display n records
cricketers_df.show()

+----------------+---+-------+-------+
|            Name|Age|Country|Average|
+----------------+---+-------+-------+
|    Viv Richards| 62|     WI|  58.67|
|   Greg Chappell| 64|    AUS|  56.83|
|    Doug Walters| 68|    AUS|   48.9|
|      VVS Laxman| 41|    IND|   46.7|
|Sachin Tendulkar| 42|    IND|   57.8|
|    Rahul Dravid| 41|    IND|   54.5|
|    Garry Sobers| 78|     WI|   58.9|
|    Rohan Kanhai| 75|     WI|   51.3|
|   GR Vishwanath| 68|    IND|   44.3|
+----------------+---+-------+-------+



In [12]:
#use filter to limit records - example shows cricketers from WI (West Indies)
cricketers_df.filter(cricketers_df.Country == "WI").show()

+------------+---+-------+-------+
|        Name|Age|Country|Average|
+------------+---+-------+-------+
|Viv Richards| 62|     WI|  58.67|
|Garry Sobers| 78|     WI|   58.9|
|Rohan Kanhai| 75|     WI|   51.3|
+------------+---+-------+-------+



In [13]:
#let us look at the schema of our dataframe
cricketers_df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Country: string (nullable = true)
 |-- Average: double (nullable = true)



In [14]:
#Display cricketers who are over 70
cricketers_df.where(cricketers_df.Age > 70).show()

+------------+---+-------+-------+
|        Name|Age|Country|Average|
+------------+---+-------+-------+
|Garry Sobers| 78|     WI|   58.9|
|Rohan Kanhai| 75|     WI|   51.3|
+------------+---+-------+-------+



In [15]:
#let us create another dataframe for Country Abbreviations and Country Names
countries = [("IND", "INDIA"), ("WI", "WEST INDIES"), ("AUS", "AUSTRALIA"), \
             ("ENG", "ENGLAND"), ("NZ", "NEW ZEALAND")]
countries_df = sqlContext.createDataFrame(countries, ["ID","NAME"])
countries_df.show()

+---+-----------+
| ID|       NAME|
+---+-----------+
|IND|      INDIA|
| WI|WEST INDIES|
|AUS|  AUSTRALIA|
|ENG|    ENGLAND|
| NZ|NEW ZEALAND|
+---+-----------+



In [16]:
#let us try out different join operations
outer_join_df = cricketers_df.join(countries_df, cricketers_df.Country == countries_df.ID, 'outer')
outer_join_df.show()

+----------------+----+-------+-------+---+-----------+
|            Name| Age|Country|Average| ID|       NAME|
+----------------+----+-------+-------+---+-----------+
|   Greg Chappell|  64|    AUS|  56.83|AUS|  AUSTRALIA|
|    Doug Walters|  68|    AUS|   48.9|AUS|  AUSTRALIA|
|    Viv Richards|  62|     WI|  58.67| WI|WEST INDIES|
|    Garry Sobers|  78|     WI|   58.9| WI|WEST INDIES|
|    Rohan Kanhai|  75|     WI|   51.3| WI|WEST INDIES|
|            null|null|   null|   null|ENG|    ENGLAND|
|            null|null|   null|   null| NZ|NEW ZEALAND|
|      VVS Laxman|  41|    IND|   46.7|IND|      INDIA|
|Sachin Tendulkar|  42|    IND|   57.8|IND|      INDIA|
|    Rahul Dravid|  41|    IND|   54.5|IND|      INDIA|
|   GR Vishwanath|  68|    IND|   44.3|IND|      INDIA|
+----------------+----+-------+-------+---+-----------+



In [17]:
inner_join_df = cricketers_df.join(countries_df, cricketers_df.Country == countries_df.ID, 'inner')
inner_join_df.show()

+----------------+---+-------+-------+---+-----------+
|            Name|Age|Country|Average| ID|       NAME|
+----------------+---+-------+-------+---+-----------+
|   Greg Chappell| 64|    AUS|  56.83|AUS|  AUSTRALIA|
|    Doug Walters| 68|    AUS|   48.9|AUS|  AUSTRALIA|
|    Viv Richards| 62|     WI|  58.67| WI|WEST INDIES|
|    Garry Sobers| 78|     WI|   58.9| WI|WEST INDIES|
|    Rohan Kanhai| 75|     WI|   51.3| WI|WEST INDIES|
|      VVS Laxman| 41|    IND|   46.7|IND|      INDIA|
|Sachin Tendulkar| 42|    IND|   57.8|IND|      INDIA|
|    Rahul Dravid| 41|    IND|   54.5|IND|      INDIA|
|   GR Vishwanath| 68|    IND|   44.3|IND|      INDIA|
+----------------+---+-------+-------+---+-----------+



In [18]:
left_outer_join_df = cricketers_df.join(countries_df, cricketers_df.Country == countries_df.ID, 'left_outer')
left_outer_join_df.show()

+----------------+---+-------+-------+---+-----------+
|            Name|Age|Country|Average| ID|       NAME|
+----------------+---+-------+-------+---+-----------+
|   Greg Chappell| 64|    AUS|  56.83|AUS|  AUSTRALIA|
|    Doug Walters| 68|    AUS|   48.9|AUS|  AUSTRALIA|
|    Viv Richards| 62|     WI|  58.67| WI|WEST INDIES|
|    Garry Sobers| 78|     WI|   58.9| WI|WEST INDIES|
|    Rohan Kanhai| 75|     WI|   51.3| WI|WEST INDIES|
|      VVS Laxman| 41|    IND|   46.7|IND|      INDIA|
|Sachin Tendulkar| 42|    IND|   57.8|IND|      INDIA|
|    Rahul Dravid| 41|    IND|   54.5|IND|      INDIA|
|   GR Vishwanath| 68|    IND|   44.3|IND|      INDIA|
+----------------+---+-------+-------+---+-----------+



In [19]:
right_outer_join_df = cricketers_df.join(countries_df, cricketers_df.Country == countries_df.ID, 'right_outer')
right_outer_join_df.show()

+----------------+----+-------+-------+---+-----------+
|            Name| Age|Country|Average| ID|       NAME|
+----------------+----+-------+-------+---+-----------+
|   Greg Chappell|  64|    AUS|  56.83|AUS|  AUSTRALIA|
|    Doug Walters|  68|    AUS|   48.9|AUS|  AUSTRALIA|
|    Viv Richards|  62|     WI|  58.67| WI|WEST INDIES|
|    Garry Sobers|  78|     WI|   58.9| WI|WEST INDIES|
|    Rohan Kanhai|  75|     WI|   51.3| WI|WEST INDIES|
|            null|null|   null|   null|ENG|    ENGLAND|
|            null|null|   null|   null| NZ|NEW ZEALAND|
|      VVS Laxman|  41|    IND|   46.7|IND|      INDIA|
|Sachin Tendulkar|  42|    IND|   57.8|IND|      INDIA|
|    Rahul Dravid|  41|    IND|   54.5|IND|      INDIA|
|   GR Vishwanath|  68|    IND|   44.3|IND|      INDIA|
+----------------+----+-------+-------+---+-----------+



In [20]:
inner_join_df = cricketers_df.join(countries_df, cricketers_df.Country == countries_df.ID, 'inner')\
                .select(cricketers_df.Name, countries_df.NAME.alias("Country_Name"), 'Average').show()

+----------------+------------+-------+
|            Name|Country_Name|Average|
+----------------+------------+-------+
|   Greg Chappell|   AUSTRALIA|  56.83|
|    Doug Walters|   AUSTRALIA|   48.9|
|    Viv Richards| WEST INDIES|  58.67|
|    Garry Sobers| WEST INDIES|   58.9|
|    Rohan Kanhai| WEST INDIES|   51.3|
|      VVS Laxman|       INDIA|   46.7|
|Sachin Tendulkar|       INDIA|   57.8|
|    Rahul Dravid|       INDIA|   54.5|
|   GR Vishwanath|       INDIA|   44.3|
+----------------+------------+-------+



In [21]:
#reading from a text file
text_df = sqlContext.read.text("../data/sample.txt")
text_df.show()

+--------------------+
|               value|
+--------------------+
|The history of th...|
|                    |
+--------------------+



In [22]:
#let us get the words into a column
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf, explode
my_func = udf(lambda s: s.lower().split(), ArrayType(StringType()))
words_df = text_df.select(my_func(text_df.value).alias('words'))

#words_df.show()
#we need to "explode" the list of words so that we have one word per row
exploded_df = words_df.select(explode(words_df.words).alias("words"))
exploded_df.show()

+----------+
|     words|
+----------+
|       the|
|   history|
|        of|
|       the|
|    united|
|   states,|
|         a|
|   country|
|        in|
|     north|
|  america,|
|   started|
|      with|
|       the|
|   arrival|
|        of|
|indigenous|
|    people|
|      from|
|   siberia|
+----------+
only showing top 20 rows



In [23]:
#get the count for each word, sorted in descending order
exploded_df.groupBy('words').count().sort("count", ascending=False).show()

+--------+-----+
|   words|count|
+--------+-----+
|     the|   17|
|      of|    8|
|      in|    4|
|     and|    3|
|    most|    2|
| british|    2|
|       a|    2|
|  people|    2|
|colonies|    2|
|  before|    2|
| arrival|    2|
|      to|    2|
|  united|    2|
|   after|    2|
| started|    2|
|      by|    1|
|   along|    1|
|   their|    1|
|  1760s,|    1|
|numerous|    1|
+--------+-----+
only showing top 20 rows



In [24]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

df = sqlContext.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
indexed.show()
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()

+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+

+---+--------+-------------+-------------+
| id|category|categoryIndex|  categoryVec|
+---+--------+-------------+-------------+
|  0|       a|          0.0|(2,[0],[1.0])|
|  1|       b|          2.0|    (2,[],[])|
|  2|       c|          1.0|(2,[1],[1.0])|
|  3|       a|          0.0|(2,[0],[1.0])|
|  4|       a|          0.0|(2,[0],[1.0])|
|  5|       c|          1.0|(2,[1],[1.0])|
+---+--------+-------------+-------------+



In [25]:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

dataFrame = sqlContext.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()

# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()

Normalized using L^1 norm
+---+--------------+------------------+
| id|      features|      normFeatures|
+---+--------------+------------------+
|  0|[1.0,0.5,-1.0]|    [0.4,0.2,-0.4]|
|  1| [2.0,1.0,1.0]|   [0.5,0.25,0.25]|
|  2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+

Normalized using L^inf norm
+---+--------------+--------------+
| id|      features|  normFeatures|
+---+--------------+--------------+
|  0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
|  2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
+---+--------------+--------------+



In [26]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show(3,False)

+---+--------------+------------------------------------------------------------+
|id |features      |scaledFeatures                                              |
+---+--------------+------------------------------------------------------------+
|0  |[1.0,0.5,-1.0]|[0.6546536707079772,0.09352195295828244,-0.6546536707079771]|
|1  |[2.0,1.0,1.0] |[1.3093073414159544,0.1870439059165649,0.6546536707079771]  |
|2  |[4.0,10.0,2.0]|[2.618614682831909,1.870439059165649,1.3093073414159542]    |
+---+--------------+------------------------------------------------------------+



In [27]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = sqlContext.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)

Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'
+-----------------------+-------+
|features               |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0    |
+-----------------------+-------+



In [28]:
#dealing with missing values
df = sqlContext.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, None)
], ["id", "category"])
df.show()

+---+--------+
| id|category|
+---+--------+
|  0|       a|
|  1|       b|
|  2|       c|
|  3|       a|
|  4|       a|
|  5|    null|
+---+--------+



In [29]:
dropped_df = df.na.drop()
dropped_df.show()

+---+--------+
| id|category|
+---+--------+
|  0|       a|
|  1|       b|
|  2|       c|
|  3|       a|
|  4|       a|
+---+--------+



In [30]:
replaced_df = df.na.fill("b")
replaced_df.show()

+---+--------+
| id|category|
+---+--------+
|  0|       a|
|  1|       b|
|  2|       c|
|  3|       a|
|  4|       a|
|  5|       b|
+---+--------+

