In [1]:
# pyspark.sql.SparkSession ------ Main entry point for DataFrame and SQL functionality.
# pyspark.sql.DataFrame ------ A distributed collection of data grouped into named columns.
# pyspark.sql.Column ----------A column expression in a DataFrame.
# pyspark.sql.Row ---------- A row of data in a DataFrame.
# pyspark.sql.GroupedData ------- Aggregation methods, returned by DataFrame.groupBy().
# pyspark.sql.DataFrameNaFunctions ----- Methods for handling missing data (null values).
# pyspark.sql.DataFrameStatFunctions ------ Methods for statistics functionality.
# pyspark.sql.functions --- List of built-in functions available for DataFrame.
# pyspark.sql.types --- List of data types available.
# pyspark.sql.Window --- For working with window functions.

In [2]:
import os
import pyspark
from pyspark.sql import SparkSession

In [3]:
# sc = SparkSession.builder.appName("PysparkExample")\    
# .config ("spark.sql.shuffle.partitions", "50")\    .config("spark.driver.maxResultSize","5g")\    
# .config ("spark.sql.execution.arrow.enabled", "true")\    .getOrCreate()

In [4]:
spark = SparkSession.builder.master("local").appName("Testing").config("spark.some.config.option", "some-value").getOrCreate()

# spark.stop()

In [5]:
spark

In [6]:
#----creating RDD -----

# columns = ["language","users_count"]
# a = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
# r = spark.sparkContext.parallelize(a)  #creating an RDD

In [7]:
#---------Using toDF() function creating Dataframe------------

# rd = r.toDF(columns)
# rd.printSchema()

In [8]:
#
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [9]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [10]:
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [None]:
#-----Using createDataFrame() from SparkSession creating Dataframe-----

# c = spark.createDataFrame(r).toDF(*columns)
# or
# c = spark.createDataFrame(data = a , schema = columns)
# df3 = spark.createDataFrame([], StructType([])) #empty df

# print(c.printSchema())
# print(type(c))
# print(c.show())

In [40]:
#----------------Create DataFrame with schema----------------------
# from pyspark.sql.types import StructType,StructField, StringType, IntegerType

# data2 = [("James","","Smith","36636","M",3000),
#     ("Michael","Rose","","40288","M",4000),
#     ("Robert","","Williams","42114","M",4000),
#     ("Maria","Anne","Jones","39192","F",4000),
#     ("Jen","Mary","Brown","","F",-1)
#   ]

# schema = StructType([ \
#     StructField("firstname",StringType(),True), \
#     StructField("middlename",StringType(),True), \
#     StructField("lastname",StringType(),True), \
#     StructField("id", StringType(), True), \
#     StructField("gender", StringType(), True), \
#     StructField("salary", IntegerType(), True) \
#   ])
 
# df = spark.createDataFrame(data=data2,schema=schema)
# df.printSchema()
# df.show(truncate=False)

In [28]:
#------------Create DataFrame from Data sources---------------
# schema = 'Age INTEGER, Sex STRING, ChestPainType STRING'
# df = spark.read.option('header','true').csv('heart.csv')
# df = spark.read.csv('/Users/mreznik/heart.csv',inferSchema=True, schema=schema,nullValue='NA' header=True)
# df = spark.read.json("examples/src/main/resources/people.json")
# df = spark.read.load('parquet_data.parquet')
# df = spark.read.load("Case.csv",format="csv", sep=",", inferSchema="true", header="true")
# df = spark.read.text("/src/resources/file.txt")

# df.write.format("csv").save("heart_save.csv")  # save data
# df.write.format("csv").mode("overwrite").save("heart_save.csv")  # if you want to overwrite the file

In [104]:
df = spark.read.csv('credit_record.csv',header=True)

In [105]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- INCOME: string (nullable = true)
 |-- FAMILY_STATUS: string (nullable = true)
 |-- HOUSING_TYPE: string (nullable = true)
 |-- FAM_MEMBERS: string (nullable = true)
 |-- STATUS: string (nullable = true)



In [123]:
df.show() #
# df.show(2,truncate=False) Display 2 rows and full column contents with truncate false
# df.show(2,truncate=25) # Display 2 rows & column values 25 characters
# df.show(n=3,truncate=25,vertical=True) # Display DataFrame rows & columns vertically

+-------+------+------+--------------+------------+-----------+------+
|     ID|GENDER|INCOME| FAMILY_STATUS|HOUSING_TYPE|FAM_MEMBERS|STATUS|
+-------+------+------+--------------+------------+-----------+------+
|5008804|     M|427500|Civil marriage|        Rent|          2|     X|
|5008805|     M|427500|Civil marriage|        Rent|          2|     0|
|5008806|     M|112500|       Married|       House|          2|     0|
|5008808|     F|270000|        Single|       House|          1|     0|
|5008809|     F|270000|        Single|       House|          1|     C|
|5008810|     F|270000|        Single|       House|          1|     C|
|5008811|     F|270000|        Single|       House|          1|     C|
|5008812|     F|283500|     Separated|       House|          1|     C|
|5008813|     F|283500|     Separated|       House|          1|     C|
|5008814|     F|283500|     Separated|       House|          1|     C|
|5008815|     M|270000|       Married|       House|          2|     C|
|51129

In [124]:
df.count()

5000

In [108]:
df.columns

['ID',
 'GENDER',
 'INCOME',
 'FAMILY_STATUS',
 'HOUSING_TYPE',
 'FAM_MEMBERS',
 'STATUS']

In [109]:
df.dtypes

[('ID', 'string'),
 ('GENDER', 'string'),
 ('INCOME', 'string'),
 ('FAMILY_STATUS', 'string'),
 ('HOUSING_TYPE', 'string'),
 ('FAM_MEMBERS', 'string'),
 ('STATUS', 'string')]

In [110]:
type(df)

pyspark.sql.dataframe.DataFrame

In [111]:
df.describe().show()

+-------+-----------------+------+------------------+--------------+---------------+------------------+--------------------+
|summary|               ID|GENDER|            INCOME| FAMILY_STATUS|   HOUSING_TYPE|       FAM_MEMBERS|              STATUS|
+-------+-----------------+------+------------------+--------------+---------------+------------------+--------------------+
|  count|             5000|  5000|              5000|          5000|           5000|              5000|                4999|
|   mean|     5140650.5812|  null|        195431.769|          null|           null|            2.1748|0.025404157043879907|
| stddev|374123.9451344954|  null|114032.49657124284|          null|           null|0.8525199810348216|  0.1573946918353626|
|    min|          5008804|     F|            101250|Civil marriage|Co-op apartment|                 1|                   0|
|    max|          6790437|     M|             99900|         Widow|   With parents|                 6|                   X|


In [114]:
df.head()

Row(ID='5008804', GENDER='M', INCOME='427500', FAMILY_STATUS='Civil marriage', HOUSING_TYPE='Rent', FAM_MEMBERS='2', STATUS='X')

In [115]:
df.tail(1)

[Row(ID='5024491', GENDER='M', INCOME='135000', FAMILY_STATUS='Married', HOUSING_TYPE='Rent', FAM_MEMBERS='3', STATUS=None)]

In [132]:
df["income"]

Column<'income'>

In [125]:
#convert it to Python Pandas DataFrame.
pdf = df.toPandas()
print(pdf)

           ID GENDER  INCOME   FAMILY_STATUS HOUSING_TYPE FAM_MEMBERS STATUS
0     5008804      M  427500  Civil marriage         Rent           2      X
1     5008805      M  427500  Civil marriage         Rent           2      0
2     5008806      M  112500         Married        House           2      0
3     5008808      F  270000          Single        House           1      0
4     5008809      F  270000          Single        House           1      C
...       ...    ...     ...             ...          ...         ...    ...
4995  5024487      F  225000          Single        House           1      0
4996  5024488      M  171000         Married        House           2      0
4997  5024489      M  171000         Married        House           2      0
4998  5024490      M  135000         Married         Rent           3      0
4999  5024491      M  135000         Married         Rent           3   None

[5000 rows x 7 columns]


In [128]:
type(pdf)

pandas.core.frame.DataFrame

In [134]:
df.limit(10).show()

+-------+------+------+--------------+------------+-----------+------+
|     ID|GENDER|INCOME| FAMILY_STATUS|HOUSING_TYPE|FAM_MEMBERS|STATUS|
+-------+------+------+--------------+------------+-----------+------+
|5008804|     M|427500|Civil marriage|        Rent|          2|     X|
|5008805|     M|427500|Civil marriage|        Rent|          2|     0|
|5008806|     M|112500|       Married|       House|          2|     0|
|5008808|     F|270000|        Single|       House|          1|     0|
|5008809|     F|270000|        Single|       House|          1|     C|
|5008810|     F|270000|        Single|       House|          1|     C|
|5008811|     F|270000|        Single|       House|          1|     C|
|5008812|     F|283500|     Separated|       House|          1|     C|
|5008813|     F|283500|     Separated|       House|          1|     C|
|5008814|     F|283500|     Separated|       House|          1|     C|
+-------+------+------+--------------+------------+-----------+------+



In [135]:
df.take(5)

[Row(ID='5008804', GENDER='M', INCOME='427500', FAMILY_STATUS='Civil marriage', HOUSING_TYPE='Rent', FAM_MEMBERS='2', STATUS='X'),
 Row(ID='5008805', GENDER='M', INCOME='427500', FAMILY_STATUS='Civil marriage', HOUSING_TYPE='Rent', FAM_MEMBERS='2', STATUS='0'),
 Row(ID='5008806', GENDER='M', INCOME='112500', FAMILY_STATUS='Married', HOUSING_TYPE='House', FAM_MEMBERS='2', STATUS='0'),
 Row(ID='5008808', GENDER='F', INCOME='270000', FAMILY_STATUS='Single', HOUSING_TYPE='House', FAM_MEMBERS='1', STATUS='0'),
 Row(ID='5008809', GENDER='F', INCOME='270000', FAMILY_STATUS='Single', HOUSING_TYPE='House', FAM_MEMBERS='1', STATUS='C')]

In [139]:
df.select('id','income','status').show() #SELECT COLUMNS

+-------+------+------+
|     id|income|status|
+-------+------+------+
|5008804|427500|     X|
|5008805|427500|     0|
|5008806|112500|     0|
|5008808|270000|     0|
|5008809|270000|     C|
|5008810|270000|     C|
|5008811|270000|     C|
|5008812|283500|     C|
|5008813|283500|     C|
|5008814|283500|     C|
|5008815|270000|     C|
|5112956|270000|     C|
|6153651|270000|     C|
|5008819|135000|     0|
|5008820|135000|     0|
|5008821|135000|     0|
|5008822|135000|     0|
|5008823|135000|     0|
|5008824|135000|     0|
|5008825|130500|     0|
+-------+------+------+
only showing top 20 rows



In [None]:
#change the name of the single column:
cases = cases.withColumnRenamed("infection_case","infection_source")
#for all columns:
cases = cases.toDF(*['case_id', 'province', 'city', 'group', 'infection_case', 'confirmed','latitude', 'longitude'])


#SORT
cases.sort("confirmed").show()

# descending Sort
from pyspark.sql import functions as F
cases.sort(F.desc("confirmed")).show()

#CAST
from pyspark.sql.types import DoubleType, IntegerType, StringType
cases = cases.withColumn('confirmed', F.col('confirmed').cast(IntegerType()))
cases = cases.withColumn('city', F.col('city').cast(StringType()))


#FILTER

cases.filter((cases.confirmed>10) & (cases.province=='Daegu')).show()

#GROUPBY
from pyspark.sql import functions as F
cases.groupBy(["province","city"]).agg(F.sum("confirmed") ,F.max("confirmed")).show()

#alias keyword to rename columns
cases.groupBy(["province","city"]).agg(F.sum("confirmed").alias("TotalConfirmed"),F.max("confirmed").alias("MaxFromOneConfirmedCase")).show()


#JOINS
cases = cases.join(regions, ['province','city'],how='left')
cases.limit(10).toPandas()

#Broadcast/Map Side Joins
from pyspark.sql.functions import broadcast
cases = cases.join(broadcast(regions), ['province','city'],how='left')


#Use SQL With Data Frames
cases.registerTempTable('cases_table')
newDF = sqlContext.sql('select * from cases_table where confirmed>100')
newDF.show()

#Create New Columns
#USING SPARK NATIVE FUNCTIONS
import pyspark.sql.functions as F
casesWithNewConfirmed = cases.withColumn("NewConfirmed", 100 + F.col("confirmed"))
casesWithNewConfirmed.show()
#math functions like the F.exp function:
casesWithExpConfirmed = cases.withColumn("ExpConfirmed", F.exp("confirmed"))
casesWithExpConfirmed.show()


#USING SPARK UDFS
import pyspark.sql.functions as F
from pyspark.sql.types import *
def casesHighLow(confirmed):
    if confirmed < 50: 
        return 'low'
    else:
        return 'high'
    
#convert to a UDF Function by passing in the function and return type of function
casesHighLowUDF = F.udf(casesHighLow, StringType())
CasesWithHighLow = cases.withColumn("HighLow", casesHighLowUDF("confirmed"))
CasesWithHighLow.show()


#USING RDDS
    import math
    from pyspark.sql import Row
    def rowwise_function(row):
        # convert row to python dictionary:
        row_dict = row.asDict()
        # Add a new key in the dictionary with the new column name and 
    value.
        # This might be a big complex function.
        row_dict['expConfirmed'] = float(np.exp(row_dict['confirmed']))
        # convert dict to row back again:
        newrow = Row(**row_dict)
        # return new row
        return newrow
    # convert cases dataframe to RDD
    cases_rdd = cases.rdd
    # apply our function to RDD
    cases_rdd_new = cases_rdd.map(lambda row: rowwise_function(row))
    # Convert RDD Back to DataFrame
    casesNewDf = sqlContext.createDataFrame(cases_rdd_new)
    casesNewDf.show()
    
    
    
#USING PANDAS UDF
cases.printSchema()


In [None]:
from pyspark.sql.types import IntegerType, StringType, DoubleType, BooleanType
from pyspark.sql.types import StructType, StructField

# Declare the schema for the output of our function

outSchema = StructType([StructField('case_id',IntegerType(),True),
                        StructField('province',StringType(),True),
                        StructField('city',StringType(),True),
                        StructField('group',BooleanType(),True),
                        StructField('infection_case',StringType(),True),
                        StructField('confirmed',IntegerType(),True),
                        StructField('latitude',StringType(),True),
                        StructField('longitude',StringType(),True),
                        StructField('normalized_confirmed',DoubleType(),True)
                       ])
# decorate our function with pandas_udf decorator
@F.pandas_udf(outSchema, F.PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.confirmed
    v = v - v.mean()
    pdf['normalized_confirmed'] = v
    return pdf

confirmed_groupwise_normalization = cases.groupby("infection_case").apply(subtract_mean)

confirmed_groupwise_normalization.limit(10).toPandas()

In [None]:
#Spark Window Functions

#RANKING'
from pyspark.sql.window import Window
windowSpec = Window().partitionBy(['province']).orderBy(F.desc('confirmed'))
cases.withColumn("rank",F.rank().over(windowSpec)).show()


#LAG VARIABLES
from pyspark.sql.window import Window
windowSpec = Window().partitionBy(['province']).orderBy('date')
timeprovinceWithLag = timeprovince.withColumn("lag_7",F.lag("confirmed", 7).over(windowSpec))
timeprovinceWithLag.filter(timeprovinceWithLag.date>'2020-03-10').show()



In [None]:
# ROLLING AGGREGATIONS
from pyspark.sql.window import Window
windowSpec = Window().partitionBy(['province']).orderBy('date').rowsBetween(-6,0)
timeprovinceWithRoll = timeprovince.withColumn("roll_7_confirmed",F.mean("confirmed").over(windowSpec))
timeprovinceWithRoll.filter(timeprovinceWithLag.date>'2020-03-10').show()

In [None]:
from pyspark.sql.window import Window
windowSpec = Window().partitionBy(['province']).orderBy('date').rowsBetween(Window.
unboundedPreceding,Window.currentRow)
timeprovinceWithRoll = timeprovince.withColumn("cumulative_confirmed",F.sum("confirmed").over
(windowSpec))
timeprovinceWithRoll.filter(timeprovinceWithLag.date>'2020-03-
10').show()

In [None]:
#Pivot Data Frames
pivotedTimeprovince = 
timeprovince.groupBy('date').pivot('province').agg(F.sum('confirmed').
alias('confirmed') , F.sum('released').alias('released'))
pivotedTimeprovince.limit(10).toPandas()

In [None]:
#Unpivot/Stack Data Frames

newColnames = [x.replace("-","_") for x in 
pivotedTimeprovince.columns]
pivotedTimeprovince = pivotedTimeprovince.toDF(*newColnames)

In [None]:
unpivotedTimeprovince = pivotedTimeprovince.select('date',F.expr(exprs))

In [None]:
#Salting

#STEP ONE: CREATE A SALTING KEY
cases = cases.withColumn("salt_key", F.concat(F.col("infection_case"), F.lit("_"), F.monotonically_increasing_id() % 10))
#STEP TWO: FIRST GROUPBY ON SALT KEY
cases_temp = cases.groupBy(["infection_case","salt_key"]).agg(F.sum("confirmed")).alias("salt_confirmed").show()

#3. SECOND GROUP ON THE ORIGINAL KEY
cases_temp = cases.groupBy(["infection_case"]).agg(F.sum("confirmed")).alias("salt_confirmed").show()




In [None]:
#CACHING
df.cache().count()

In [None]:
#SAVE AND LOAD FROM AN INTERMEDIATE STEP
df.write.parquet("data/df.parquet")
df.unpersist()
spark.read.load("data/df.parquet")

In [None]:
#REPARTITIONING
df = df.repartition(1000)
df = df.repartition('cola', 'colb','colc','cold')
df.rdd.getNumPartitions()
df.glom().map(len).collect()


In [None]:
#READING PARQUET FILE IN LOCAL

from glob import glob
def load_df_from_parquet(parquet_directory):
   df = pd.DataFrame()
   for file in glob(f"{parquet_directory}/*"):
      df = pd.concat([df,pd.read_parquet(file)])
   return df

In [None]:
# SparkSession Commonly Used Methods

# version() – Returns the Spark version where your application is running, probably the Spark version your cluster is configured with.

# createDataFrame() – This creates a DataFrame from a collection and an RDD

# getActiveSession() – 

# read() – Returns an instance of DataFrameReader class, this is used to read records from csv, parquet, avro, and more file formats into DataFrame.

# readStream() – Returns an instance of DataStreamReader class, this is used to read streaming data. that can be used to read streaming data into DataFrame.

# sparkContext() – Returns a SparkContext.

# sql() – Returns a DataFrame after executing the SQL mentioned.

# sqlContext() – Returns SQLContext.

# stop() – Stop the current SparkContext.

# table() – Returns a DataFrame of a table or view.

# udf() – Creates a PySpark UDF to use it on DataFrame, Dataset, and SQL.

In [None]:
# Create PySpark DataFrame
# SparkSession also provides several methods to create a Spark DataFrame and DataSet. The below example uses the createDataFrame() method which takes a list of data.


# Create DataFrame
df = spark.createDataFrame(
    [("Scala", 25000), ("Spark", 35000), ("PHP", 21000)])
df.show()

In [None]:
4.3 Working with Spark SQL
Using SparkSession you can access PySpark/Spark SQL capabilities in PySpark. In order to use SQL features first, you need to create a temporary view in PySpark. Once you have a temporary view you can run any ANSI SQL queries using spark.sql() method.


# Spark SQL
df.createOrReplaceTempView("sample_table")
df2 = spark.sql("SELECT _1,_2 FROM sample_table")
df2.show()


PySpark SQL temporary views are session-scoped and will not be available if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view using createGlobalTempView()

4.4 Create Hive Table
As explained above SparkSession is used to create and query Hive tables. Note that in order to do this for testing you don’t need Hive to be installed. saveAsTable() creates Hive managed table. Query the table using spark.sql().


# Create Hive table & query it.  
spark.table("sample_table").write.saveAsTable("sample_hive_table")
df3 = spark.sql("SELECT _1,_2 FROM sample_hive_table")
df3.show()


4.5 Working with Catalogs
To get the catalog metadata, PySpark Session exposes catalog variable. Note that these methods spark.catalog.listDatabases and spark.catalog.listTables and returns the DataSet.


# Get metadata from the Catalog
# List databases
dbs = spark.catalog.listDatabases()
print(dbs)

# Output
#[Database(name='default', description='default database', 
#locationUri='file:/Users/admin/.spyder-py3/spark-warehouse')]

# List Tables
tbls = spark.catalog.listTables()
print(tbls)

#Output
#[Table(name='sample_hive_table', database='default', description=None, tableType='MANAGED', #isTemporary=False), Table(name='sample_hive_table1', database='default', description=None, #tableType='MANAGED', isTemporary=False), Table(name='sample_hive_table121', database='default', #description=None, tableType='MANAGED', isTemporary=False), Table(name='sample_table', database=None, #description=None, tableType='TEMPORARY', isTemporary=True)]

Notice the two tables we have created, Spark table is considered a temporary table and Hive table as managed table.