In [1]:
#Intellesence in Jupyter Notebook
%config IPCompleter.greedy=True
%config Completer.use_jedi = False

## Basic Operation on DataFrame

The entry point to programming Spark with the Data Frame is Spark session.
With a SQL Context, applications can create Data Frames from an existing RDD, from a Hive table, or from data sources.

When working in interactive mode the Spark Context, Spark Session, SQL Context are by default created with names 'sc',' spark', 'sqlContext' respectively.

In case of PySpark script Spark Context, Spark Session, SQL Context needs to be manually created. An object of Spark Context is required for the creation of SQL Context object and Spark Session object. Below is shown an example where Spark Session and SQL Context is created.

In [2]:
# importing pyspark library
import findspark

# specifying the path of SPARK_HOME
findspark.init(r"D:\Study\PySpark\Pysparksetup\spark")

findspark.find()
import pyspark
findspark.find()
# importing SparkSession to create Spark session
from pyspark.sql import SparkSession

In [3]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
#creating Spark Context - already discussed
sc = SparkContext(master='local',appName='test1')
#creating Spark Session
spark = SparkSession(sc)
#creating SQL Context
sqlContext = SQLContext(sc)

In [4]:
#importing data using Spark Session
df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
#importing data using SQL Context
df2 = sqlContext.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)

In [5]:
'''
The data frame object 'df1' and 'df2' doesn't contain the actual data, 
but instead, contains the lineage, similar to an RDD object.
On printing the data frame object it shows the metadata about the imported data frame object.
'''
print(df1)

DataFrame[Sepal_Length: string, Sepal_Width: string, Petal_Length: string, Petal_Width: string, Species: string]


In [6]:
iris1_df1 = spark.read.json('dataset/iris/iris.json')
print(iris1_df1)

DataFrame[Petal_Length: double, Petal_Width: double, Sepal_Length: double, Sepal_Width: double, Species: string]


#### Convert RDD to Data Frame

In [7]:
iris1 = sc.textFile("dataset/iris/iris_site.csv")
iris1_split = iris1.map(lambda line: line.split(","))
df1=spark.createDataFrame(iris1_split)
df1.show(10)


+---+---+---+---+------+
| _1| _2| _3| _4|    _5|
+---+---+---+---+------+
|5.1|3.5|1.4|0.2|setosa|
|4.9|3.0|1.4|0.2|setosa|
|4.7|3.2|1.3|0.2|setosa|
|4.6|3.1|1.5|0.2|setosa|
|5.0|3.6|1.4|0.2|setosa|
|5.4|3.9|1.7|0.4|setosa|
|4.6|3.4|1.4|0.3|setosa|
|5.0|3.4|1.5|0.2|setosa|
|4.4|2.9|1.4|0.2|setosa|
|4.9|3.1|1.5|0.1|setosa|
+---+---+---+---+------+
only showing top 10 rows



#### Convert Data Frame to RDD

In [8]:
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.rdd.map(tuple).take(10)

[('5.1', '3.5', '1.4', '0.2', 'setosa'),
 ('4.9', '3.0', '1.4', '0.2', 'setosa'),
 ('4.7', '3.2', '1.3', '0.2', 'setosa'),
 ('4.6', '3.1', '1.5', '0.2', 'setosa'),
 ('5.0', '3.6', '1.4', '0.2', 'setosa'),
 ('5.4', '3.9', '1.7', '0.4', 'setosa'),
 ('4.6', '3.4', '1.4', '0.3', 'setosa'),
 ('5.0', '3.4', '1.5', '0.2', 'setosa'),
 ('4.4', '2.9', '1.4', '0.2', 'setosa'),
 ('4.9', '3.1', '1.5', '0.1', 'setosa')]

#### Display Contents of Data Frame

In [9]:
'''
Display Contents of Data Frame in Table Format
==============================================
Contents of a data frame can be viewed using the show function. 
It takes an argument 'n' which is the total number of lines to be displayed.
'''
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.show(n=5)


+------------+-----------+------------+-----------+-------+
|Sepal_Length|Sepal_Width|Petal_Length|Petal_Width|Species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



In [10]:
'''
Display Contents of Data Frame as a List of Rows
================================================
All the contents of a data frame can be sent back to the driver program as a
list of rows objects using the collect function.
'''

iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.collect()

[Row(Sepal_Length='5.1', Sepal_Width='3.5', Petal_Length='1.4', Petal_Width='0.2', Species='setosa'),
 Row(Sepal_Length='4.9', Sepal_Width='3.0', Petal_Length='1.4', Petal_Width='0.2', Species='setosa'),
 Row(Sepal_Length='4.7', Sepal_Width='3.2', Petal_Length='1.3', Petal_Width='0.2', Species='setosa'),
 Row(Sepal_Length='4.6', Sepal_Width='3.1', Petal_Length='1.5', Petal_Width='0.2', Species='setosa'),
 Row(Sepal_Length='5.0', Sepal_Width='3.6', Petal_Length='1.4', Petal_Width='0.2', Species='setosa'),
 Row(Sepal_Length='5.4', Sepal_Width='3.9', Petal_Length='1.7', Petal_Width='0.4', Species='setosa'),
 Row(Sepal_Length='4.6', Sepal_Width='3.4', Petal_Length='1.4', Petal_Width='0.3', Species='setosa'),
 Row(Sepal_Length='5.0', Sepal_Width='3.4', Petal_Length='1.5', Petal_Width='0.2', Species='setosa'),
 Row(Sepal_Length='4.4', Sepal_Width='2.9', Petal_Length='1.4', Petal_Width='0.2', Species='setosa'),
 Row(Sepal_Length='4.9', Sepal_Width='3.1', Petal_Length='1.5', Petal_Width='0.1',

In [None]:
'''
Displays First 'n' Rows of Data Frame as a List of Rows
=======================================================
First 'n' rows of a data frame can be sent back to the driver program 
as a list of rows objects using head function.
'''
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.head(10)

#### Data Selection

In [11]:
'''
Data Selection
===============
Any particular column of a data frame can be selected by specifying
the name of the column in the 'select' function.
'''

iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.select("Sepal_Length","Species").show()

+------------+-------+
|Sepal_Length|Species|
+------------+-------+
|         5.1| setosa|
|         4.9| setosa|
|         4.7| setosa|
|         4.6| setosa|
|         5.0| setosa|
|         5.4| setosa|
|         4.6| setosa|
|         5.0| setosa|
|         4.4| setosa|
|         4.9| setosa|
|         5.4| setosa|
|         4.8| setosa|
|         4.8| setosa|
|         4.3| setosa|
|         5.8| setosa|
|         5.7| setosa|
|         5.4| setosa|
|         5.1| setosa|
|         5.7| setosa|
|         5.1| setosa|
+------------+-------+
only showing top 20 rows



#### Join

In [12]:
iris1_df1 = spark.read.csv(path='dataset/iris/merge/iris_merge1.csv',sep=',',header=True)
iris1_df2 = spark.read.csv(path='dataset/iris/merge/iris_merge2.csv',sep=',',header=True)
iris1_df1.join(other=iris1_df2,on='ID',how='inner').show()

+---+------------+-----------+------------+-----------+-------+
| ID|Sepal_Length|Sepal_Width|Petal_Length|Petal_Width|Species|
+---+------------+-----------+------------+-----------+-------+
|  1|         5.1|        3.5|         1.4|        0.2| setosa|
|  2|         4.9|          3|         1.4|        0.2| setosa|
|  3|         4.7|        3.2|         1.3|        0.2| setosa|
|  4|         4.6|        3.1|         1.5|        0.2| setosa|
|  5|           5|        3.6|         1.4|        0.2| setosa|
|  6|         5.4|        3.9|         1.7|        0.4| setosa|
|  7|         4.6|        3.4|         1.4|        0.3| setosa|
|  8|           5|        3.4|         1.5|        0.2| setosa|
|  9|         4.4|        2.9|         1.4|        0.2| setosa|
| 10|         4.9|        3.1|         1.5|        0.1| setosa|
| 11|         5.4|        3.7|         1.5|        0.2| setosa|
| 12|         4.8|        3.4|         1.6|        0.2| setosa|
| 13|         4.8|          3|         1

In [13]:
# Once two data frames are joined, required columns from the two table
#can be retrieved using select function along with the join function.

iris1_df1 = spark.read.csv(path='dataset/iris/merge/iris_merge1.csv',sep=',',header=True)
iris1_df2 = spark.read.csv(path='dataset/iris/merge/iris_merge2.csv',sep=',',header=True)
iris1_df1.join(other=iris1_df2,on='ID',how='inner').select(iris1_df1.Sepal_Length,iris1_df2.Petal_Length).show()

+------------+------------+
|Sepal_Length|Petal_Length|
+------------+------------+
|         5.1|         1.4|
|         4.9|         1.4|
|         4.7|         1.3|
|         4.6|         1.5|
|           5|         1.4|
|         5.4|         1.7|
|         4.6|         1.4|
|           5|         1.5|
|         4.4|         1.4|
|         4.9|         1.5|
|         5.4|         1.5|
|         4.8|         1.6|
|         4.8|         1.4|
|         4.3|         1.1|
|         5.8|         1.2|
|         5.7|         1.5|
|         5.4|         1.3|
|         5.1|         1.4|
|         5.7|         1.7|
|         5.1|         1.5|
+------------+------------+
only showing top 20 rows



In [14]:
# Below is shown another example of how to join two tables where the joining columns present
# in the two tables have a different name.
iris1_df1 = spark.read.csv(path='dataset/iris/merge/iris_merge1.csv',sep=',',header=True)
iris1_df2 = spark.read.csv(path='dataset/iris/merge/iris_merge2.csv',sep=',',header=True) 
iris1_df1.join(other=iris1_df2,on=(iris1_df1.ID==iris1_df2.ID),how='inner').show()

+------------+-----------+---+---+------------+-----------+-------+
|Sepal_Length|Sepal_Width| ID| ID|Petal_Length|Petal_Width|Species|
+------------+-----------+---+---+------------+-----------+-------+
|         5.1|        3.5|  1|  1|         1.4|        0.2| setosa|
|         4.9|          3|  2|  2|         1.4|        0.2| setosa|
|         4.7|        3.2|  3|  3|         1.3|        0.2| setosa|
|         4.6|        3.1|  4|  4|         1.5|        0.2| setosa|
|           5|        3.6|  5|  5|         1.4|        0.2| setosa|
|         5.4|        3.9|  6|  6|         1.7|        0.4| setosa|
|         4.6|        3.4|  7|  7|         1.4|        0.3| setosa|
|           5|        3.4|  8|  8|         1.5|        0.2| setosa|
|         4.4|        2.9|  9|  9|         1.4|        0.2| setosa|
|         4.9|        3.1| 10| 10|         1.5|        0.1| setosa|
|         5.4|        3.7| 11| 11|         1.5|        0.2| setosa|
|         4.8|        3.4| 12| 12|         1.6| 

In [15]:
'''
Union
=====
'''

iris1_df1 = spark.read.csv(path='dataset/iris/union/iris_union1.csv',sep=',',header=True)
iris1_df2 = spark.read.csv(path='dataset/iris/union/iris_union2.csv',sep=',',header=True)
iris1_df1.union(iris1_df2).show()


+------------+-----------+------------+-----------+
|Sepal.Length|Sepal.Width|Petal.Length|Petal.Width|
+------------+-----------+------------+-----------+
|           5|          3|           1|          0|
|         4.6|       null|           2|        0.1|
|         7.2|        3.1|         5.1|          1|
|           8|          4|           7|          2|
|          10|          6|           2|          0|
|         9.2|          0|           4|        0.2|
|        14.4|        6.2|        10.2|          2|
|          16|          8|          14|          4|
+------------+-----------+------------+-----------+



## DataFrame MetaData

In [16]:
'''
Retrive Column Names
====================
'''
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.columns

['Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Species']

In [18]:
'''
Retrieve Schema of the Data Frame
=================================
Structure for data frame can be defined with the help of StructField and 
StructType function. StructType is the data type representing a Row. 
It consisting of a list of StructField. StructField is a field in StructType. It's arguments are

name - name of the columns
datatype - data type of the column
nullable - boolean value defining if the column is nullable or not
'''

iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.schema

StructType(List(StructField(Sepal_Length,StringType,true),StructField(Sepal_Width,StringType,true),StructField(Petal_Length,StringType,true),StructField(Petal_Width,StringType,true),StructField(Species,StringType,true)))

In [19]:
# Display Datatype
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.dtypes

[('Sepal_Length', 'string'),
 ('Sepal_Width', 'string'),
 ('Petal_Length', 'string'),
 ('Petal_Width', 'string'),
 ('Species', 'string')]

In [20]:
# Default Structure of Data Frame
'''
When the data frame is created from an RDD it can be observed from the below
result that the Data Frame has no column header.
In addition, when data is being imported from a csv file, there might be situations
when a float column is defined as a string column. To overcome this issue, a structure
needs to impose on the data to be imported.

We will see how a structure can be imposed on the data frame.
'''

iris1 = sc.textFile("dataset/iris/iris_site.csv")
iris1_split = iris1.map(lambda line: line.split(","))
iris1_split = iris1_split.map(lambda var1: [float(var1[0]), float(var1[1]), float(var1[2]), float(var1[3]), var1[4]])
df1=spark.createDataFrame(iris1_split)
df1.show()


+---+---+---+---+------+
| _1| _2| _3| _4|    _5|
+---+---+---+---+------+
|5.1|3.5|1.4|0.2|setosa|
|4.9|3.0|1.4|0.2|setosa|
|4.7|3.2|1.3|0.2|setosa|
|4.6|3.1|1.5|0.2|setosa|
|5.0|3.6|1.4|0.2|setosa|
|5.4|3.9|1.7|0.4|setosa|
|4.6|3.4|1.4|0.3|setosa|
|5.0|3.4|1.5|0.2|setosa|
|4.4|2.9|1.4|0.2|setosa|
|4.9|3.1|1.5|0.1|setosa|
|5.4|3.7|1.5|0.2|setosa|
|4.8|3.4|1.6|0.2|setosa|
|4.8|3.0|1.4|0.1|setosa|
|4.3|3.0|1.1|0.1|setosa|
|5.8|4.0|1.2|0.2|setosa|
|5.7|4.4|1.5|0.4|setosa|
|5.4|3.9|1.3|0.4|setosa|
|5.1|3.5|1.4|0.3|setosa|
|5.7|3.8|1.7|0.3|setosa|
|5.1|3.8|1.5|0.3|setosa|
+---+---+---+---+------+
only showing top 20 rows



In [21]:
'''
Defining Structure for Data Frame
==================================
A table contains multiple fields. So, while defining the structure of a table we need to define
 -> number of columns
 -> name of each column
 -> data type of each column

For a structured data, all the rows have the same structure. Here, we define the structure
of the row using StructType class. It takes as an argument, a collection of StructField 
class objects, which is used to define the metadata about the columns in each row.

StructField takes as input
 -> name - name of the columns
 -> datatype - data type of the column
 -> nullable - boolean value defining if the column is nullable or not
'''
from pyspark.sql.types import StructType, StructField, FloatType, StringType
iris_schema = pyspark.sql.types.StructType([
StructField("Sepal_Length", FloatType(), True),
StructField("Sepal_Width", FloatType(), True),
StructField("Petal_Length", FloatType(), True),
StructField("Petal_Width", FloatType(), True),
StructField("Species", StringType(), True)
])
print(iris_schema)


StructType(List(StructField(Sepal_Length,FloatType,true),StructField(Sepal_Width,FloatType,true),StructField(Petal_Length,FloatType,true),StructField(Petal_Width,FloatType,true),StructField(Species,StringType,true)))


In [None]:
'''
Assigning Defined Structure to Data Frame
=========================================

When the data frame is created using an RDD, the defined schema can be
assigned as highlighted in the below code snippet.
'''

from pyspark.sql.types import StructType, StructField, FloatType, StringType
iris_schema = pyspark.sql.types.StructType([
StructField("Sepal_Length", FloatType(), True),
StructField("Sepal_Width", FloatType(), True),
StructField("Petal_Length", FloatType(), True),
StructField("Petal_Width", FloatType(), True),
StructField("Species", StringType(), True)
])
iris1 = sc.textFile("dataset/iris/iris_site.csv")
iris1_split = iris1.map(lambda line: line.split(","))
iris1_split = iris1_split.map(lambda var1: [float(var1[0]), float(var1[1]), float(var1[2]), float(var1[3]),var1[4]])
df1=spark.createDataFrame(iris1_split,iris_schema)
df1.show()


In [None]:
'''
Importing with Schema
======================

'''
from pyspark.sql.types import StructType, StructField, FloatType, StringType
iris_schema = pyspark.sql.types.StructType([
StructField("Sepal_Length", FloatType(), True),
StructField("Sepal_Width", FloatType(), True),
StructField("Petal_Length", FloatType(), True),
StructField("Petal_Width", FloatType(), True),
StructField("Species", StringType(), True)
])
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True,schema=iris_schema)
iris1_df1.dtypes

In [None]:
'''
Converting Datatype
===================
Data type of a particular column can be changed by first selecting those
columns using select function and then changing its type using the cast function.
'''

iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.dtypes
iris1_df1.select(iris1_df1.Petal_Length.cast("float"),iris1_df1.Sepal_Length.cast("float")).show()

In [None]:
'''
Drop Columns
============

'''

iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.drop('Species').show()

In [None]:
'''
Drop by Selection
=================
Any particular column can be removed not only by using drop function, 
but also by selecting only the other required columns, other than the ones to be removed.
'''

iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.select("Sepal_Length","Species").distinct().show()

## Basic Descriptive Statistics on Data Frame

#### Sorting Data

In [None]:
from pyspark.sql.types import StructType, StructField, FloatType, StringType
iris_schema = pyspark.sql.types.StructType([
StructField("Sepal_Length", FloatType(), True),
StructField("Sepal_Width", FloatType(), True),
StructField("Petal_Length", FloatType(), True),
StructField("Petal_Width", FloatType(), True),
StructField("Species", StringType(), True)
])

iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True,schema=iris_schema)
iris1_df1.sort('Petal_Length',ascending=False).show()

In [None]:
## Cannot sort directly after reading the file as csv
## Datatype is not identified
## need to map data types to column first to sort
## when sort is applied directly to DF the o/p is wrong

buy1 = spark.read.csv(path='dataset/iris/buy.csv',sep=',',header=True)
buy1.sort('income', ascending=True).show()

In [None]:
from pyspark.sql.types import StructType, StructField, FloatType, StringType
buy_schema = pyspark.sql.types.StructType([
StructField("age", FloatType(), True),
StructField("income", FloatType(), True),
StructField("gender", StringType(), True),
StructField("marital", StringType(), True),
StructField("buys", StringType(), True)
])

buy_df1 = spark.read.csv(path='dataset/iris/buy.csv',sep=',',header=True, schema=buy_schema)

buy_df1.sort("income", ascending=True).show()

#### Filtering Data Based on a Condition

In [None]:
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.select("Sepal_Length","Species").filter("Species=='virginica'").show()

In [None]:
# To filter only the flowers where 'Species' is either 'setosa' or 'versicolor'
# isin function can be used as shown below.
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1[iris1_df1.Species.isin(['setosa'])].show()

#### Distinct Count

In [None]:
# Distinct
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.select("Species").distinct().show()

In [None]:
# Distinct Count
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.select("Species").distinct().count()

#### Aggregation

In [None]:
# Simple Aggregation
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.agg({"Sepal_Length": "sum"}).show()

In [None]:
# Aggregating Grouped Data
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.groupBy('Species').agg({'Sepal_Length':'mean'}).show()

In [None]:
# Statistical Summary
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.describe(['Sepal_Length','Sepal_Width','Petal_Length','Petal_Width','Species']).show()

#### Calculating Quantiles of Data

In [None]:
from pyspark.sql.types import StructType, StructField, FloatType, StringType
iris_schema = pyspark.sql.types.StructType([
StructField("Sepal_Length", FloatType(), True),
StructField("Sepal_Width", FloatType(), True),
StructField("Petal_Length", FloatType(), True),
StructField("Petal_Width", FloatType(), True),
StructField("Species", StringType(), True)
])
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True,schema=iris_schema)
iris1_df1.approxQuantile( col='Sepal_Length', probabilities=[0.4, 0.6, 0.8], relativeError=0 )

#### Multi-Dimension View of Data

In [None]:
'''
A multi-dimensional view of the data can be obtained with the help of cube function.
In the example shown below, a multi-dimensional report is generated for the mean of
'Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width' values for various Species.
'''

from pyspark.sql.types import StructType, StructField, FloatType, StringType
iris_schema = pyspark.sql.types.StructType([
StructField("Sepal_Length", FloatType(), True),
StructField("Sepal_Width", FloatType(), True),
StructField("Petal_Length", FloatType(), True),
StructField("Petal_Width", FloatType(), True),
StructField("Species", StringType(), True)
])
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True,schema=iris_schema)
iris1_df1.cube('Species').mean().show()

#### Co-variance and Correlation

In [None]:
from pyspark.sql.types import StructType, StructField, FloatType, StringType
iris_schema = pyspark.sql.types.StructType([
StructField("Sepal_Length", FloatType(), True),
StructField("Sepal_Width", FloatType(), True),
StructField("Petal_Length", FloatType(), True),
StructField("Petal_Width", FloatType(), True),
StructField("Species", StringType(), True)
])
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True,schema=iris_schema)
print('co-variance - ' , iris1_df1.cov('Sepal_Length','Petal_Length'))
print('correlation - ' , iris1_df1.corr('Sepal_Length','Petal_Length'))

#### Confusion Matrix 

In [None]:
from pyspark.sql.types import StructType, StructField, FloatType, StringType
iris_schema = pyspark.sql.types.StructType([
StructField("Sepal_Length", FloatType(), True),
StructField("Sepal_Width", FloatType(), True),
StructField("Petal_Length", FloatType(), True),
StructField("Petal_Width", FloatType(), True),
StructField("Species", StringType(), True)
])
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True,schema=iris_schema)
iris1_df1.crosstab('Species','Species').show()

#### Creating Temp Table from Data Frame

In [None]:
'''
Temp tables can be created from an existing data frame using 'createOrReplaceTempView'
or 'registerDataFrameAsTable' functions. In the below shown examples, temp table are 
created with the name 'iris1_table1', using these functions.
'''
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.createOrReplaceTempView("iris1_table1")

In [None]:
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
sqlContext.registerDataFrameAsTable(iris1_df1, "iris1_table1")

#### Querying Temp Table

In [None]:
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.createOrReplaceTempView("iris1_table1")
sql1 = spark.sql("SELECT Sepal_Length, Species from iris1_table1")
sql1.show()

#### Retrieve Available Tables in a Database

In [None]:
'''
tableNames function can be used to get a list of all the tables in a particular database. 
The name of the database is given as an argument to the tableNames function. 
If no name is given, then all the tables in the default database are printed
'''
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
sqlContext.registerDataFrameAsTable(iris1_df1, "iris1_table1")
sqlContext.tableNames()

In [None]:
'''
Another method of retrieving the name of tables in a database is using 'tables' function

This function returns a data frame containing the name of the database, table and the 
information if that particular table is temporary or not
'''
sqlContext.tables().show()

#### Convert Table Back to Data Frame

In [None]:
# A table can be converted back to a data frame using the table function
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.createOrReplaceTempView("iris1_table1")
df1 = spark.table("iris1_table1")
df1.show()

## Hive Intergration

#### Creating Hive Context

In [None]:
'''
As of version 2.0 of Spark, Hive tables can be accessing using Spark Context itself, 
and also the use of Hive context is deprecated. However, we will see how we can create one.

Hive Context can be created using 'HiveContext' class. To HiveContext, 
we should pass asargument, the current Spark Context, as shown below.
'''
from pyspark.sql import HiveContext
hc = HiveContext(sc)
print(hc)

#### Accessing Hive Tables

Hive data can be accessed using sqlContext or HiveContext. In this example, we will make use of sqlContext to access 'retailer' table present in 'db' database in Hive.
The first step is to select the Hive database which contains our Hive table to be queried.
###### sqlContext.sql('use db')

Once selected, we can get a list of the tables available in this database using the 'tablesNames' data frame function.
###### sqlContext.tableNames()

Hive tables can be queried using HiveQL commands furnished using sql function, shown below.
###### sqlContext.sql('select * from retailer').show()


#### Exporting Data Frame To Hive

Spark is meant to process the huge amount of data distributed among various system. The result of such large-scale processing will usually be very big and hence would be required to be stored on distributed systems. In this example, we will store the contents of a Spark Data Frame (iris dataset) on a Hadoop Distributed File System as a Hive table.

To write data frame contents to Hive table we require 'saveAsTable' which is part of 'DataFrameWriter' class. The 'DataFrameWriter' class constructor takes as an argument, the data frame to be exported. The 'saveAsTable' function takes two arguments -
name - name of the target table in Hive database
mode - 
append: Append contents of the Data Frame to existing Hive table
overwrite: Overwrite existing Hive table
ignore: Silently ignore this operation if Hive table already exists
error: Throw an exception if Hive table already exists

It is first required to select the database where the data is to be written, then make use of 'DataFrameWriter' and 'saveAsTable' to export the data, as shown above.




In [None]:
from pyspark.sql.types import StructType, StructField, FloatType, StringType
iris_schema = pyspark.sql.types.StructType([
StructField("Sepal_Length", FloatType(), True),
StructField("Sepal_Width", FloatType(), True),
StructField("Petal_Length", FloatType(), True),
StructField("Petal_Width", FloatType(), True),
StructField("Species", StringType(), True)
])
iris1_df1 = spark.read.csv(path='./dataset/iris.csv',sep=',',header=True,schema=iris_schema)
sqlContext.sql('use db')
from pyspark.sql import DataFrameWriter
dfw = DataFrameWriter(iris1_df1)
dfw.saveAsTable(name="iris",mode='overwrite')


### Introduction to User-Define Functions

Consider a scenario where a calculated column needs to be created in 10 Hive tables by performing complex operations on one of the existing columns in all of those 10 tables. A user-defined function can be created which performs the operation on its input data using sql and return the result. 

Python functions can be registered as Spark User-Defined Functions for two purposes.

1. For being used in Spark SQL statements
2. For being used on Spark Data Frame

Both the methods would be discussed in order in the latter sections of this course.

#### UDF for SQL Query

In [None]:
'''
For a python function to be used in SQL queries, it needs to be registered using the 'register' function which is part of 'UDFRegistration' class.
'UDFRegistration' takes sqlContext as an argument.
To create UDF, we pass the following arguments

 -> name - name of the UDF
 -> f - python function to be registered as UDF
 -> returnType - data type of python function return value

Below is an example to create a UDF with name 'new_fun' to adds an integer 10 to its input argument.
'''

"""
from pyspark.sql.types import FloatType
from pyspark.sql import UDFRegistration
udf1 = UDFRegistration(sqlContext)
udf1.register(name='new_fun', f=lambda var1: var1+10, returnType=FloatType())

# UDF on Temp Tables
from pyspark.sql.types import StructType, StructField, FloatType, StringType
iris_schema = pyspark.sql.types.StructType([
StructField("Sepal_Length", FloatType(), True),
StructField("Sepal_Width", FloatType(), True),
StructField("Petal_Length", FloatType(), True),
StructField("Petal_Width", FloatType(), True),
StructField("Species", StringType(), True)
])
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True,schema=iris_schema)
sqlContext.registerDataFrameAsTable(iris1_df1, "iris1_table1")
sqlContext.sql('select new_fun(Sepal_Length) from iris_temp').show()

# UDF on Hive Tables

sqlContext.sql('use db')
sqlContext.sql('SELECT new_fun(Sepal_Length) FROM iris').show()

"""

#### UDF for Data Frame

In [None]:
'''
Data Frame, UDF can be created using 'udf' function in 'pyspark.sql.functions' module. It takes two arguments.
 -> f – python function
 -> returnType – a pyspark.sql.types.DataType object
Below is an example, to create a UDF with name 'new_fun' to add an integer 10 to its input argument.
'''
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
new_fun2 = udf(lambda var1: var1+10, returnType=FloatType())


In [None]:
from pyspark.sql.types import StructType, StructField, FloatType, StringType
iris_schema = pyspark.sql.types.StructType([
StructField("Sepal_Length", FloatType(), True),
StructField("Sepal_Width", FloatType(), True),
StructField("Petal_Length", FloatType(), True),
StructField("Petal_Width", FloatType(), True),
StructField("Species", StringType(), True)
])
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True,schema=iris_schema)
iris1_df1.select(new_fun2(iris1_df1.Sepal_Length)).show()


# Optimize Iterative Operations

Caching is used to place the data in-memory, for fast computation of iterative processes. Data Frame, Temp Tables, and RDD can be cached. It works differently for tables and data frames. In case of tables, caching places the whole table in-memory. However, in data frames caching done is lazy, which mean that it will cache only those rows used in the forthcoming processing events. So, if we cache a particular data frame, it doesn't get in-memory immediately, instead, it will cache only the subset of that data fame on which some 'Action' has been performed post caching. So, for example, if a particular RDD has been cached, and only 50% of the data in that RDD is used in an Action, then only that 50% data of that RDD will be cached.

Similar to cache, there exist something called persist, which is also used to enhance the execution performance of iterative operations. However, persist is capable of placing the iterative data only locations other than 'in-memory', like disk. Caching is nothing but persist 'in-memory only'. So, if it is required to assign another storage, then we can go ahead with the persist function.

This section has examples to show how to cache, uncache, persist and unpersist on RDDs, Data Frames and Temp Tables

#### Cache Uncache Implementation

In [None]:
'''
Cache
=====
RDDs and Dataframes can be cached using cache function, and tables can be cached using cacheTable function
'''
#RDD

iris1 = sc.textFile("dataset/iris/iris_site.csv")
iris1.cache()

In [None]:
# Dataframe cache

iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.cache()

In [None]:
# Table Cache

iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.createOrReplaceTempView("TempView1")
cache1 = sqlContext.cacheTable("TempView1")
cache1

In [None]:
# Cached tables can be uncached using 'uncacheTable' function as shown below.
iris1_df1 = spark.read.csv(path='dataset/iris/iris.csv',sep=',',header=True)
iris1_df1.createOrReplaceTempView("TempView1")
cache1 = sqlContext.cacheTable("TempView1")
sqlContext.uncacheTable("TempView1")

In [None]:
from pyspark import SparkContext, streaming
str1 = streaming.StreamingContext(sparkContext=sc, batchDuration=3)


In [None]:
#socketTextStream(hostname, port, storageLevel=StorageLevel(True, True, False, False, 2))
data1 = str1.socketTextStream(hostname="localhost", port=8765)


In [None]:
data1.saveAsTextFiles('streaming/file')
data1.pprint()


In [None]:
str1.start()
str1.awaitTermination()


In [None]:
spark-submit test1.py localhost 8765


In [None]:
nc -lk 8765
