# PySpark

PySpark is a Python API which is an analytical processing engine for large-scale powerful distributed data processing and machine learning applications.
100 times faster than traditional systems and can be used to process real time data.
It natively has machine learning and graph libraries.
Supports Python v3.8 or newer

# Apache Spark

Has a master(driver) slave(worker) architecture. When a spark application is run, the driver program creates a context which serves as an entry point to the application. The cluster manager is responsible for managing the resources.

## PySpark Modules and packages

- PySpark RDD (pyspark.RDD)
- PySpark DataFrame and SQL (pyspark.sql)
- PySpark Streaming (pyspark.streaming)
- PySpark MLib (pyspark.ml, pyspark.mllib) 
- PySpark GraphFrames (GraphFrames)
- PySpark Resource (pyspark.resource) It’s new in PySpark 3.0

In [1]:
import findspark
findspark.init()

import py4j
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local[1]").appName("SarkByExamples.com").getOrCreate()

In [3]:
dataList = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]

#parallelize method is used to crearte an RDD from a list
rdd=spark.sparkContext.parallelize(dataList)

#to create an RDD from a text file
#rdd = spark.sparkContext.textFile("file.txt")

In [4]:
rdd.count()

3

## RDD

PySpark RDD (Resilient Distributed Dataset) is a fundamental data structure of PySpark that is fault-tolerant, immutable distributed collections of objects, which means once you create an RDD you cannot change it. Each dataset in RDD is divided into logical partitions, which can be computed on different nodes of the cluster.

To create an rdd, it is required to create a spark session.

    Note: There can be multiple spark sessions but only one spark context in a jvm

## RDD operations

### RDD transformations - 
Transformations on an RDD return a new RDD with the updates made and the original RDD remains unchanged. These operations do not execute until we call an action on an RDD(lazy operations).
    Examples: map(), reduceByKey(), filter(), sortByKey(), flatMap()

### RDD actions -
Operations that return a non RDD value to the driver node.
    Examples: count(), collect(), reduce(), max(), first()




## PySpark DataFrame

In [5]:
#Creating and Empty RDD
emptyRDD = spark.sparkContext.emptyRDD() #alternative   emptyRDD = spark.sparkContext.parallelize([])
print(emptyRDD)

EmptyRDD[2] at emptyRDD at NativeMethodAccessorImpl.java:0


In [6]:
#Create a schema
from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
  StructField('firstname', StringType(), True),
  StructField('middlename', StringType(), True),
  StructField('lastname', StringType(), True)
  ])

#Creating an empty DataFrame without RDD
#df = spark.createDataFrame([],schema)

#Creating an empty DataFrame from an empty RDD
df = spark.createDataFrame(emptyRDD,schema)  #alternative    df1 = emptyRDD.toDF(schema)
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)



In [7]:
#Creating an empty DataFrame without schema
emptyDf = spark.createDataFrame([], StructType([]))
emptyDf.printSchema()

root



## PySpark DataFrame to Pandas DataFrame
PySpark is faster than Pandas because it runs on multiple nodes in a parallel fashion whereas pandas only runs on a single machine. Processing large datasets becomes efficient using PySpark. After processing, we might need to convert the PySpark DataFrame to Pandas DataFrame.

In [8]:
data = [("John","","Williams"),
        ("Karthick","","K",),
        ("Karthik","","R"),
        ]

columns = ["first_name","middle_name","last_name"]
pysparkDF = spark.createDataFrame(data = data, schema = columns)
pysparkDF.printSchema()
pysparkDF.show(truncate=False)

root
 |-- first_name: string (nullable = true)
 |-- middle_name: string (nullable = true)
 |-- last_name: string (nullable = true)

+----------+-----------+---------+
|first_name|middle_name|last_name|
+----------+-----------+---------+
|John      |           |Williams |
|Karthick  |           |K        |
|Karthik   |           |R        |
+----------+-----------+---------+



In [9]:
pandasDF = pysparkDF.toPandas()
print(pandasDF)

  first_name middle_name last_name
0       John              Williams
1   Karthick                     K
2    Karthik                     R


In [10]:
#show method of DataFrames

#dataFrame.show(numberOfRows, truncate, vertical)

#numberOfRows is set to 20 by default and vertical is set to False which means columns are displayed horizontally
#truncate is set to 20 by default(It only displays 20 characters of the column value)

pysparkDF.show(truncate=False, vertical=True) 

-RECORD 0---------------
 first_name  | John     
 middle_name |          
 last_name   | Williams 
-RECORD 1---------------
 first_name  | Karthick 
 middle_name |          
 last_name   | K        
-RECORD 2---------------
 first_name  | Karthik  
 middle_name |          
 last_name   | R        



## StructType
StructType is commonly used to define the scehma for creating a DataFrame. Using StructType and StructFields, we can create complex schemas with nested structuress. Using StructType schema ensures that the data is correctly interpreted and structured.

In [11]:
# Defining schema using nested StructType
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType, MapType

structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df = spark.createDataFrame(data=structureData,schema=structureSchema)
df.printSchema()
df.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+--------------------+-----+------+------+
|name                |id   |gender|salary|
+--------------------+-----+------+------+
|{James, , Smith}    |36636|M     |3100  |
|{Michael, Rose, }   |40288|M     |4300  |
|{Robert, , Williams}|42114|M     |1400  |
|{Maria, Anne, Jones}|39192|F     |5500  |
|{Jen, Mary, Brown}  |     |F     |-1    |
+--------------------+-----+------+------+



In [12]:
#Adding and changing columns
from pyspark.sql.functions import col,struct,when
#adds column "Other"
updatedDF = df.withColumn("Other", 
    struct(col("id").alias("Id"),
    col("gender").alias("Gender"),
    col("salary").alias("Salary"),
    when(col("salary").cast(IntegerType()) < 2000,"Low")
      .when(col("salary").cast(IntegerType()) < 4000,"Medium")
      .otherwise("High").alias("Grade")
  )).drop("id","gender","salary")    #removes the columns id, gender and salary

updatedDF.printSchema()
updatedDF.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- Other: struct (nullable = false)
 |    |-- Id: string (nullable = true)
 |    |-- Gender: string (nullable = true)
 |    |-- Salary: integer (nullable = true)
 |    |-- Grade: string (nullable = false)

+--------------------+------------------------+
|name                |Other                   |
+--------------------+------------------------+
|{James, , Smith}    |{36636, M, 3100, Medium}|
|{Michael, Rose, }   |{40288, M, 4300, High}  |
|{Robert, , Williams}|{42114, M, 1400, Low}   |
|{Maria, Anne, Jones}|{39192, F, 5500, High}  |
|{Jen, Mary, Brown}  |{, F, -1, Low}          |
+--------------------+------------------------+



In [13]:
#Creating schema from json

import json
schemaFromJson = StructType.fromJson(json.loads(structureSchema.json()))
df = spark.createDataFrame(
        spark.sparkContext.parallelize(structureData),schemaFromJson)
df.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



## Column class in pyspark 
### (pyspark.sql.Column)

#### Accessing columns in data frame
col("column_name")<br>
df.column_name<br>
df["column_name"]<br>


#### Commonly used methods
<b>alias()</b> - assigns an alias to the column<br>
<b>asc()</b> and <b>desc()</b> - returns ascending and descending order of the column respectively<br>
<b>cast()</b> and <b>astype()</b> - used to convert the column value to another data type<br>
<b>between()</b> - returns a boolean expression when a column value is in between the lower and upper bound<br>
<b>contains()</b> - see if a column value contains the substring.<br>
<b>when</b> and <b>otherwise</b> - used like sql case expressions(see code below)<br>
<b>getField()</b> - to get value by key in MapType fields or Struct.<br>
<b>getItem()</b> - to get value by index or key in ArrayType and MapType StructFields<br>


In [14]:
data=[("James","Bond","100",''),
      ("Anne","Hill","200",'F'),
      ("Tom","Cruise","400",'T'),
      ("Tom","Hardy","400",'M')] 
columns=["fname","lname","id","gender"]
df=spark.createDataFrame(data,columns)

df.select(df.fname,df.lname,when(df.gender=="M","Male") \
              .when(df.gender=="F","Female") \
              .when(df.gender=='' ,"Not Specified") \
              .otherwise(df.gender).alias("new_gender") \
    ).show()

+-----+------+-------------+
|fname| lname|   new_gender|
+-----+------+-------------+
|James|  Bond|Not Specified|
| Anne|  Hill|       Female|
|  Tom|Cruise|            T|
|  Tom| Hardy|         Male|
+-----+------+-------------+



In [15]:
data=[(("James","Bond"),["Java","C#"],{'hair':'black','eye':'brown'}),
      (("Ann","Varsa"),[".NET","Python"],{'hair':'brown','eye':'black'}),
      (("Tom Cruise",""),["Python","Scala"],{'hair':'red','eye':'grey'}),
      (("Tom Brand",None),["Perl","Ruby"],{'hair':'black','eye':'blue'})]

schema = StructType([
        StructField('name', StructType([
            StructField('fname', StringType(), True),
            StructField('lname', StringType(), True)])),
        StructField('languages', ArrayType(StringType()),True),
        StructField('properties', MapType(StringType(),StringType()),True)
     ])
df=spark.createDataFrame(data,schema)

df.select(df.properties.getField("hair")).show()

+----------------+
|properties[hair]|
+----------------+
|           black|
|           brown|
|             red|
|           black|
+----------------+



## DataFrame Methods

#### select
It is a transformation which returns a new DataFrame with the selected columns

In [16]:
# select (selecting columns from a DataFrame)

#select a single column
#df.select(col("name")).show()

#select multiple columns
#df.select(col("name"),col("languages")).show()

#select all columns from a list
#df.select([col for col in df.columns]).show()

#select specific column from nested struct
df.select("name.fname").show()

+----------+
|     fname|
+----------+
|     James|
|       Ann|
|Tom Cruise|
| Tom Brand|
+----------+



#### collect
It is an action that returns the entire dataset in an array to the driver.<br>
It returns an Array of Row type objects.<br>
Calling collect on large datasets can lead to OutOfMemory error

In [17]:
dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)

deptDetails = deptDF.collect()

print(deptDetails)

[Row(dept_name='Finance', dept_id=10), Row(dept_name='Marketing', dept_id=20), Row(dept_name='Sales', dept_id=30), Row(dept_name='IT', dept_id=40)]


#### WithColumn
It is a transformation that returns a new DataFrame. It can be used to update the values of a column, create a new column, change data type of column, etc...

In [18]:
from pyspark.sql.functions import lit

#Changing data type
deptDF.withColumn("dept_id",col("dept_id").cast("String")).show()

#Adding a column
deptDF.withColumn("Employee",lit(True)).show()

#Renaming a column
deptDF.withColumnRenamed("dept_name","department").show()

#To remove a column
#deptDF.drop("dept_name")

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+

+---------+-------+--------+
|dept_name|dept_id|Employee|
+---------+-------+--------+
|  Finance|     10|    true|
|Marketing|     20|    true|
|    Sales|     30|    true|
|       IT|     40|    true|
+---------+-------+--------+

+----------+-------+
|department|dept_id|
+----------+-------+
|   Finance|     10|
| Marketing|     20|
|     Sales|     30|
|        IT|     40|
+----------+-------+



#### filter
It is used to select/filter rows in a DataFrame based on a condition

In [19]:
#filter using column value
deptDF.filter(deptDF.dept_name == "Finance").show()

#filter using sql expression
deptDF.filter("dept_name == 'Finance'").show()

#filter using multiple conditions
deptDF.filter((deptDF.dept_name=="Finance") | (deptDF.dept_name=="IT")).show()

#filter using list values
accepted_depts = ["Finance","IT"]
deptDF.filter(deptDF.dept_name.isin(accepted_depts)).show()

#syntax for filter using array_contains
#from pyspark.sql.functions import array_contains
#df.filter(array_contains(df.col_name,value))

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
+---------+-------+

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
+---------+-------+

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|       IT|     40|
+---------+-------+

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|       IT|     40|
+---------+-------+



#### distinct() and dropDuplicates()
<b>distinct</b> - can be used to return a new DataFrame with only the distinct rows.<br>
<b>dropDuplicates</b> - returns a new DataFrame with the duplicate rows dropped, same as distinct. But dropDuplicates also takes column names as parameters that it should check for duplicates.

In [20]:
data = [("James", "Sales", 3000), \
    ("Michael", "Sales", 4600), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", 3300), \
    ("Jen", "Finance", 3900), \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000), \
    ("Saif", "Sales", 4100) \
  ]

columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)


#using distinct
print("Using distinct: ")
distinctDF = df.distinct()
print("Distinct count: ",str(distinctDF.count()))
distinctDF.show()

#using dropDuplicates
print("Using dropDuplicates: ")
distinctDF = df.dropDuplicates()
print("Distinct count: ",str(distinctDF.count()))
distinctDF.show()

#using dropDuplicates with column parameters
print("Using dropDuplicates with column parameters: ")
distinctDF = df.dropDuplicates(["department","salary"])
print("Distinct count: ",str(distinctDF.count()))
distinctDF.show()

Using distinct: 
Distinct count:  9
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|          Jen|   Finance|  3900|
|      Michael|     Sales|  4600|
|        Scott|   Finance|  3300|
|        Kumar| Marketing|  2000|
|        James|     Sales|  3000|
|       Robert|     Sales|  4100|
|         Jeff| Marketing|  3000|
|         Saif|     Sales|  4100|
|        Maria|   Finance|  3000|
+-------------+----------+------+

Using dropDuplicates: 
Distinct count:  9
+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|          Jen|   Finance|  3900|
|      Michael|     Sales|  4600|
|        Scott|   Finance|  3300|
|        Kumar| Marketing|  2000|
|        James|     Sales|  3000|
|       Robert|     Sales|  4100|
|         Jeff| Marketing|  3000|
|         Saif|     Sales|  4100|
|        Maria|   Finance|  3000|
+-------------+----------+------+

Using dropDuplicates with column par

#### sort and orderBy
These methods are used for sorting the rows of the data frame based on one or more columns. By default they are sorted in ascending order. We can use .asc() and .desc() on column parameters to change the order.

In [21]:
data = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Raman","Finance","CA",99000,40,24000), \
    ("Scott","Finance","NY",83000,36,19000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns= ["employee_name","department","state","salary","age","bonus"]

df = spark.createDataFrame(data = data, schema = columns)

#sorting based on department and state in ascending order(by default)
df.sort("department","state").show(truncate=False)

#sorting based on department in descending order
df.orderBy(col("department").desc()).show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
+-------------+----------+-----+------+---+-----+

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Jeff         |Marketing |CA   |80000 |25 |18000|

#### groupBy
Used to group rows based on one or more column values. Often used in conjunction with aggregate functions

In [22]:
from pyspark.sql.functions import col,sum,avg,max

simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)

#sum of salary of employees in each dept
df.groupBy("department").sum("salary").show(truncate=False)

#having multiple aggregate functions
df.groupBy("department") \
    .agg(sum("salary").alias("sum_salary"), \
         avg("salary").alias("avg_salary"), \
         sum("bonus").alias("sum_bonus"), \
         max("bonus").alias("max_bonus") \
     ) \
    .show(truncate=False)

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|Sales     |257000     |
|Finance   |351000     |
|Marketing |171000     |
+----------+-----------+

+----------+----------+-----------------+---------+---------+
|department|sum_salary|avg_salary       |sum_bonus|max_bonus|
+----------+----------+-----------------+---------+---------+
|Sales     |257000    |85666.66666666667|53000    |23000    |
|Finance   |351000    |87750.0          |81000    |24000    |
|Marketing |171000    |85500.0          |39000    |21000    |
+----------+----------+-----------------+---------+---------+



#### join
This operation is similar to join operation in SQL

#### different joins
<b>Inner Join:</b> Returns only the rows with matching keys in both DataFrames.<br>
<b>Left Join:</b> Returns all rows from the left DataFrame and matching rows from the right DataFrame.<br>
<b>Right Join:</b> Returns all rows from the right DataFrame and matching rows from the left DataFrame.<br>
<b>Full Outer Join:</b> Returns all rows from both DataFrames, including matching and non-matching rows.<br>
<b>Left Semi Join:</b> Returns all rows from the left DataFrame where there is a match in the right DataFrame.<br>
<b>Left Anti Join:</b> Returns all rows from the left DataFrame where there is no match in the right DataFrame.

#### union
It is used to combine the rows of two DataFrames with same schema
unionAll was deprecated since PySpark 2.0.0 and union is used now

#### unionByName
This is used to merge two DataFrames by column names rather than position. Hence it can be used to merge two data frames where the column names are same but in different order. It also supports missing columns(Set allowMissingColumns to True)

#### udf - User Defined Functions
It is used to register user functions so that they can be reused in PySpark

In [23]:
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

In [24]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Converting function to UDF 
convertUDF = udf(lambda z: convertCase(z),StringType())

#To register the function in sql
#spark.udf.register("convertUDF", convertCase,StringType())

#udf with annotation

#@udf(returnType=StringType()) 
#def user_function():
#    return something

In [25]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.select(col("Seqno"),convertUDF(col("Name")).alias("Name")).show()

+-----+-------------+
|Seqno|         Name|
+-----+-------------+
|    1|  John Jones |
|    2|Tracey Smith |
|    3| Amy Sanders |
+-----+-------------+



#### DataFrame.transform()
This function is used to apply multiple transformations on a data frame by chaining them and returns the final Data Frame.

#### sql.functions.transform()
This function is used to transform a column of ArrayType where the function takes another function as a paramter and applies to all the elements of the array. This is done to all the values of the column and new column is returned.

In [26]:
simpleData = (("Java",4000,5), \
    ("Python", 4600,10),  \
    ("Scala", 4100,15),   \
    ("Scala", 4500,15),   \
    ("PHP", 3000,20),  \
  )
columns= ["CourseName", "fee", "discount"]

df = spark.createDataFrame(data = simpleData, schema = columns)

def reduce_price(df,reduceBy=1000):
    return df.withColumn("new_fee",df.fee - reduceBy)

def apply_discount(df):
    return df.withColumn("discounted_fee", df.new_fee - (df.new_fee * df.discount) / 100)

# transform() 
transformedDF = df.transform(reduce_price) \
        .transform(apply_discount)
                
transformedDF.show()

# Create DataFrame with Array
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"]),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"]),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"])
]
df = spark.createDataFrame(data=data,schema=["Name","Languages1","Languages2"])


# using sql.functions.transform() 
from pyspark.sql.functions import upper
from pyspark.sql.functions import transform
df.select(transform("Languages1", lambda x: upper(x)).alias("languages1")).show()

+----------+----+--------+-------+--------------+
|CourseName| fee|discount|new_fee|discounted_fee|
+----------+----+--------+-------+--------------+
|      Java|4000|       5|   3000|        2850.0|
|    Python|4600|      10|   3600|        3240.0|
|     Scala|4100|      15|   3100|        2635.0|
|     Scala|4500|      15|   3500|        2975.0|
|       PHP|3000|      20|   2000|        1600.0|
+----------+----+--------+-------+--------------+

+------------------+
|        languages1|
+------------------+
|[JAVA, SCALA, C++]|
|[SPARK, JAVA, C++]|
|      [CSHARP, VB]|
+------------------+



#### map
It is an RDD transformation that is used to apply the transformation function (lambda) on every element of RDD/DataFrame and returns a new RDD.

In [27]:
data = [('Dinesh','Thakur','M',30),
  ('Karthick','K','M',26),
  ('John','Williams','M',37), 
]

columns = ["firstname","lastname","gender","age"]
df = spark.createDataFrame(data=data, schema = columns)

#Accessing columns using indexes
#map can only be perfromed on RDD and not DataFrame
rdd2=df.rdd.map(lambda x: 
    (x[0]+","+x[1],x[2],x[3])
    )  
df2=rdd2.toDF(["name","gender","age"]   )
df2.show()

+-------------+------+---+
|         name|gender|age|
+-------------+------+---+
|Dinesh,Thakur|     M| 30|
|   Karthick,K|     M| 26|
|John,Williams|     M| 37|
+-------------+------+---+



#### flatMap
Used to flatten the column of an RDD by passing a function to it which is applied to every element of the specified column.

In [28]:
data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)

rdd2=rdd.flatMap(lambda x: x.split(" "))
for element in rdd2.collect():
    print(element)

Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
Project
Gutenberg’s
Adventures
in
Wonderland
Project
Gutenberg’s


#### foreach
It takes a function as a parameter and applies it on every element of the DataFrame or RDD

#### sample
syntax - .sample(withReplacement, fraction, seed=None)
It is used to get a sample of a dataset. Can be used on DataFrame and RDD

#### sampleBy
same syntax
It is used to get stratified sample of data.

#### takeSample
same syntax
Returns the sample as an array

withReplacement is set to False by default. seed is used to generate same random sample.

#### fill and fillna
Both are aliases of each other and they are used to fill the null and none values with some other value that is given as a paramter to this function.
syntax - .fill(value,subset=None)    The subset parameter is used to fill select subset of columns

#### pivot
It is used to rotate the values of a column to mulitple columns

In [29]:
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.printSchema()
pivotDF.show(truncate=False)

root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+

root
 |-- Product: string (nullable = true)
 |-- Canada: long (nullable = true)
 |-- China: long (nullable = true)
 |-- Mexico: long (nullable = true)
 |-- USA: long (nullable = true)

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |null  |4000 |null  |4000|
|Beans  |null  |1500 |2000  |1600|
|Banana |2000  |400  |null  |1000|
|Carrots|2000  |1200 |null  |1500|
+-------+------+-----+------+----+



## PySpark SQL Functions

### Aggregate functions

approx_count_distinct<br>
avg<br>
collect_list<br>
collect_set<br>
countDistinct<br>
count<br>
grouping<br>
first<br>
last<br>
kurtosis<br>
max<br>
min<br>
mean<br>
skewness<br>
stddev<br>
stddev_samp<br>
stddev_pop<br>
sum<br>
sumDistinct<br>
variance, var_samp, var_pop<br>

### Window functions

<b>row_number()</b> gives the row number of the DataFrame row in the Window partition<br>
<b>rank()</b> is used to provide a rank to the result within a Window partition<br>
<b>dense_rank()</b> is same as rank but does not leave gaps in rank when there is a tie<br>
<b>percent_rank()</b><br>
<b>ntile()</b> returns relative rank of result rows in a Window partition<br>
<b>cume_dist()</b> returns the cumulative distribution of values<br>
<b>lead()</b> and <b>lag()</b> as in sql<br>
aggregate functions such as sum, avg, etc..


### SQL Date and Timestamp functions

#### Some date functions
current_date()<br>
date_format(dateExpr,format)<br>
to_date()<br>
to_date(column, fmt)<br>
add_months(Column, numMonths)<br>
date_add(column, days)<br>
date_sub(column, days)<br>
datediff(end, start)<br>
months_between(end, start)<br>

#### Some timestamp functions
current_timestamp ()<br>
hour(column)<br>
minute(column)<br>
second(column)<br>
to_timestamp(column)<br>
to_timestamp(column, format)<br>

### json functions

from_json() - to convert json string to StructType or MapType<br>
to_json() - to convert StructType or MapType to json string<br>
json_tuple() - to extract the columns from json and generate new columns from the result
get_json_object() - is used to extract the json string based on path from the json column
schema_of_json() - to create schema string from json string column

In [30]:
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import from_json, to_json, json_tuple

jsonString="""{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}"""
df=spark.createDataFrame([(1, jsonString)],["id","value"])
df.show(truncate=False)

print("using from_json")
df2=df.withColumn("value",from_json(df.value,MapType(StringType(),StringType())))
df2.printSchema()
df2.show(truncate=False)

print("using to_json")
df2.withColumn("value",to_json(col("value"))) \
   .show(truncate=False)

print("using json_tuple")
df.select(col("id"),json_tuple(col("value"),"Zipcode","ZipCodeType","City")) \
    .toDF("id","Zipcode","ZipCodeType","City") \
    .show(truncate=False)

+---+--------------------------------------------------------------------------+
|id |value                                                                     |
+---+--------------------------------------------------------------------------+
|1  |{"Zipcode":704,"ZipCodeType":"STANDARD","City":"PARC PARQUE","State":"PR"}|
+---+--------------------------------------------------------------------------+

using from_json
root
 |-- id: long (nullable = true)
 |-- value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+---+---------------------------------------------------------------------------+
|id |value                                                                      |
+---+---------------------------------------------------------------------------+
|1  |{Zipcode -> 704, ZipCodeType -> STANDARD, City -> PARC PARQUE, State -> PR}|
+---+---------------------------------------------------------------------------+

using to_json
+---+---

## PySpark Data sources

###  Read and Write csv files
df = spark.read.csv("filename") - to read the csv file<br>
df = spark.read.option("header", True).csv("filename") - set to true if the first row is a row of headers<br>
df = df = spark.read.csv("path1,path2,path3") - read multiple csv files
df = spark.read.csv("folder_path") - read all csv files in a folder

df = spark.read.options(header='True', inferSchema='True', delimiter=',') \
  .csv("/tmp/resources/zipcodes.csv") - setting multiple options at once<br>
Note - inferSchema is used to infer the column types from the values of the csv file and delimiter is the one which separates columns<br>

df_with_schema = spark.read.format("csv").option("header", True).schema(user_schema).load("/tmp/resources/zipcodes.csv") - reading csv file based on user's schema<br>

df2.write.options(header='True', delimiter=',').csv("/tmp/spark_output/zipcodes" - writing the data frame to a csv file<br>

modes - overwrite, append, ignore - if file already exists, error - error if file already exists<br><br><br>

### Read and Write parquet files

Same as for csv files. Replace csv with parquet

Creating a table on Partuet file -<br>
spark.sql("CREATE TEMPORARY VIEW PERSON USING parquet OPTIONS (path \"/tmp/output/people.parquet\")")<br>
spark.sql("SELECT * FROM PERSON").show()<br>

In [31]:
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.write.mode('overwrite').parquet("sample.parquet")
spark.sql("CREATE TEMPORARY VIEW SAMPLE USING parquet OPTIONS (path \"sample.parquet\")")
spark.sql("SELECT * FROM SAMPLE").show()

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
| Banana|  1000|    USA|
|Carrots|  1500|    USA|
|  Beans|  1600|    USA|
| Orange|  2000|    USA|
| Orange|  2000|    USA|
| Banana|   400|  China|
|Carrots|  1200|  China|
|  Beans|  1500|  China|
| Orange|  4000|  China|
| Banana|  2000| Canada|
|Carrots|  2000| Canada|
|  Beans|  2000| Mexico|
+-------+------+-------+



### Read and Write JSON file
Same as for parquet. Replace parquet with json. You can also create tables on json files in a similar way.<br><br><br>

### Reading a Hive table
<b>Enable hive support while creating spark session</b><br>
warehouse_location = abspath('spark-warehouse')<br>
spark = SparkSession \ <br>
    .builder \ <br>
    .appName("SparkByExamples.com") \ <br>
    .config("spark.sql.warehouse.dir", warehouse_location) \ <br>
    .enableHiveSupport() \ <br>
    .getOrCreate()<br>
    

To read a hive table -<br>
df = spark.sql("select * from emp.employee") or df = spark.read.table("employee")
df.show()<br><br><br>




### Save DataFrame to Hive table

#### Creating a Hive Internal Table -<br>
df.write.mode('overwrite').saveAsTable("table_name")<br>

#### Creating a Hive Internal Table inside a database<br>
spark.sql("CREATE DATABASE IF NOT EXISTS database_name")<br>
df.write.mode('overwrite').saveAsTable("database_name.table_name")<br>

#### Creating a Hive External Table
sampleDF.write.mode("overwrite").option("path", "/path/to/external/table").saveAsTable("database_name.table_name")<br><br><br>

### Read JDBC in parallel

#### Reading JDBC in parallel(using necessary config)
spark = SparkSession.builder \ <br>
           .appName('SparkByExamples.com') \ <br>
           .config("spark.jars", "mysql-connector-java-8.0.13.jar") \ <br>
           .getOrCreate()<br>
<br>
df = spark.read \ <br>
    .jdbc("jdbc:mysql://localhost:3306/emp", "employee", \ <br>
          properties={"user": "root", "password": "root", "driver":"com.mysql.cj.jdbc.Driver"})<br><br>

df.show()<br><br>

#### Read Table in Parallel
df = spark.read \ <br>
    .format("jdbc") \ <br>
    .option("driver","com.mysql.cj.jdbc.Driver") \ <br>
    .option("url", "jdbc:mysql://localhost:3306/emp") \ <br>
    .option("dbtable","employee") \ <br>
    .option("numPartitions",5) \ <br>
    .option("user", "root") \ <br>
    .option("password", "root") \ <br>
    .load() <br><br>
    
#### Select columns with where clause (You can use either dbtable or query not both)
df = spark.read \ <br>
    .format("jdbc") \ <br>
    .option("driver","com.mysql.cj.jdbc.Driver") \ <br>
    .option("url", "jdbc:mysql://localhost:3306/emp") \ <br>
    .option("query","select id,age from employee where gender='M'") \ <br>
    .option("numPartitions",5) \ <br>
    .option("user", "root") \ <br>
    .option("password", "root") \ <br>
    .load() <br><br>
    
#### Using fetchsize(No. of rows to fetch - default: 10)
df = spark.read \ <br>
    .format("jdbc") \ <br>
    .option("driver","com.mysql.cj.jdbc.Driver") \ <br>
    .option("url", "jdbc:mysql://localhost:3306/emp") \ <br>
    .option("query","select id,age from employee where gender='M'") \ <br>
    .option("numPartitions",5) \ <br> 
    .option("fetchsize", 20) \  <br>
    .option("user", "root") \ <br>
    .option("password", "root") \ <br>
    .load() <br><br><br>

### Read and Write to SQL server

#### Writing data frame to SQL server table
df.write \ <br>
  .format("com.microsoft.sqlserver.jdbc.spark") \ <br>
  .mode("overwrite") \ <br>
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;") \ <br>
  .option("dbtable", "employee") \ <br>
  .option("user", "replace_user_name") \ <br>
  .option("password", "replace_password") \ <br>
  .save() <br>
  
#### Reading data from SQL server table
df = spark.read \ <br>
  .format("com.microsoft.sqlserver.jdbc.spark") \ <br>
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;") \ <br>
  .option("dbtable", "employee") \ <br>
  .option("user", "replace_user_name") \ <br>
  .option("password", "replace_password") \ <br>
  .load() <br>
  
#### Selecting specific columns to read
df = spark.read \ <br>
  .format("com.microsoft.sqlserver.jdbc.spark") \ <br>
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;") \ <br>
  .option("dbtable", "select id,age from employee where gender='M'") \ <br>
  .option("user", "replace_user_name") \ <br>
  .option("password", "replace_password") \ <br>
  .load() <br><br><br>

### Read and Write to Mysql

spark = SparkSession.builder \ <br>
           .appName('SparkByExamples.com') \ <br>
           .config("spark.jars", "mysql-connector-java-8.0.13.jar") <br>
           .getOrCreate() <br>
           
#### Writing to mysql table
df.write \ <br>
  .format("jdbc") \ <br>
  .option("driver","com.mysql.cj.jdbc.Driver") \ <br>
  .option("url", "jdbc:mysql://localhost:3306/emp") \ <br>
  .option("dbtable", "employee") \ <br>
  .option("user", "root") \ <br>
  .option("password", "root") \ <br>
  .save() <br>
  
#### Reading from mysql table
df = spark.read \ <br>
    .format("jdbc") \ <br>
    .option("driver","com.mysql.cj.jdbc.Driver") \ <br>
    .option("url", "jdbc:mysql://localhost:3306/emp") \ <br>
    .option("dbtable", "employee") \ <br>
    .option("user", "root") \ <br>
    .option("password", "root") \ <br>
    .load() <br> <br>
    
    Note - You can use a query instead of table name to get specific columns
<br><br>

### Read JDBC Table

#### Read from MySQL Table
df = spark.read \ <br>
    .format("jdbc") \ <br>
    .option("url", "jdbc:mysql://localhost:3306/emp") \ <br>
    .option("driver", "com.mysql.cj.jdbc.Driver") \ <br>
    .option("dbtable", "employee") \ <br>
    .option("user", "root") \ <br>
    .option("password", "root") \ <br>
    .load() <br><br>
    
#### Read from MySQL Table
df = spark.read \ <br>
    .format("jdbc") \ <br>
    .option("url", "jdbc:mysql://localhost:3306/emp") \ <br>
    .option("driver", "com.mysql.cj.jdbc.Driver") \ <br>
    .option("dbtable", "select id,age from employee where gender='M'") \ <br>
    .option("user", "root") \ <br>
    .option("password", "root") \ <br>
    .load() <br><br>

df.show() <br><br>

#### Using numPartitions
df = spark.read \ <br>
  .format("jdbc") \ <br>
  .option("query", "select id,age from employee where gender='M'") \ <br>
  .option("numPartitions",5) \ <br>
  .option("fetchsize", 20) \ <br>
  ....... <br>
  .load() <br><br><br>

## Some built in functions of PySpark

<b>expr</b> - used to execute sql-like expressions<br>
<b>lit</b> - used to add columns with literals or constant values. Returns Column type<br>
<b>split</b> - splits a string into an array of strings based on a specified delimiter<br>
<b>concat_ws</b> - combines the elements of an array using a delimiter for concatenation<br>
<b>substring(str, pos, len)</b> - used to get substring of a string with position and length as parameters<br>
<b>regexp_replace</b> - used to replace a value with another in a column<br>
<b>translate</b> - same as regexp_replace but replaces character by character<br>
<b>overlay</b> - used to replace a column value with string of another column from a specified position<br>
<b>collect_list(), collect_set() </b>- used to collect column values as a list, set and map respectively<br>
<b>create_map</b> - used to create a map<br>
<b>struct</b> - used to change the structure of existing dataframe and add new StructType to it<br>
<b>countDistinct</b> - used to get count of distinct values of a group<br><br>
