In [None]:


# display the schema
creditcardDF.printSchema()

In [None]:
# display first 5 records with no truncation
creditcardDF.show(5, False)

In [None]:
# defining your own schema
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, FloatType, LongType, IntegerType, DateType

# define the structure
schema = StructType([
    StructField("Region", StringType()),
    StructField("Country", StringType()),
    StructField("Item", StringType()),
    StructField("SalesChannel", StringType()),
    StructField("OrderPriority", StringType()),
    StructField("OrderDate", StringType()),
    StructField("OrderID", LongType()),
    StructField("ShipDate", StringType()),
    StructField("UnitsSold", IntegerType()),
    StructField("UnitPrice", FloatType()),
    StructField("UnitCost", FloatType()),
    StructField("TotalRevenue", DoubleType()),
    StructField("TotalCost", DoubleType()),
    StructField("TotalProfit", DoubleType())
])

# read the file by using the defined schema

# display the schema


In [None]:
# display the records


In [None]:
# selecting only a few columns
from pyspark.sql.functions import col, column
import pyspark.sql.functions as F

# select a few columns


In [None]:
# renaming a column
creditcardDF.columns

In [None]:
# renaming the columns - "Sales Channel" to "SalesChannel" and "Item Type" to "Item"
creditcardDF2 = creditcardDF.withColumnRenamed("Sales Channel", "SalesChannel").withColumnRenamed('Item Type','Item')

In [None]:
# display records
creditcardDF2.show(3, False)

In [None]:
# change column data type
from pyspark.sql.functions import col

# change the data type from integer to long
df = creditcardDF.withColumn("Order ID", col("Order ID").cast("long"))
df.printSchema() # display the schema

In [None]:
creditcardDF.show(4, False)

In [None]:
# adding columns to a dataframe
import pyspark.sql.functions as F

# add a new column "Register_Site" with default value "www.google.com"
dataDF = creditcardDF.withColumn("Register_Site", F.lit("www.google.com"))

# display only a few columns
dataDF.select("Region","Country", "Item Type", "Register_Site").show(3, False)

In [None]:
# removing columns from a DataFrame

# number of columns in a dataframe - before removing columns
print("Number of columns : ", len(dataDF.columns))

# columns - before dropping
print(list(dataDF.columns))

# drop columns - "Country", "Item Type"


# number of columns in a dataframe - after removing columns


# columns - after dropping


In [None]:
# arithmetic with dataframes
# number of columns in a dataframe - before a adding a column
print("Number of columns : ", len(creditcardDF.columns))

# perform arithmetic operations on a dataframe column

# number of columns in a dataframe - after adding columns
print("Number of columns : ", len(creditnewDF.columns))

# display records
creditnewDF.show(3)

In [None]:
# filter a dataframe



In [None]:
# filter a dataframe - multiple columns



In [None]:
# dropping rows
testDF = [[1, "January"], [2, "February"], [1, "January"], [3, "March"], [3, "March"], [3, "March"], [4, "April"], [4, "April"], [5, "May"], [5, "May"],
          [4, "April"], [6, "June"], [5, "April"]]

# import the modules
from pyspark.sql.types import *

# define the schema
schema = StructType([StructField("ID", IntegerType()),StructField("Month", StringType())])

# create the dataframe by applying schema
df = spark.createDataFrame(testDF,schema=schema) 

# display the records
df.show()

In [None]:
# display distinct rows


In [None]:
# drop duplicate records based a column value

# drop duplicate records based multiple column values


In [None]:
# rename existing columns
newDF = creditcardDF.withColumnRenamed("Unit Price", "UnitPrice").withColumnRenamed("Total Profit", "Total_Profit")

creditcardDF.show(3) # display records

from pyspark.sql.functions import expr # define the modules

# using select expression 


In [None]:
from pyspark.sql.types import *   # import the libraries

# define a list
list_data = [["Bill Gates",23],["Henry Ford", None], ["Tim Cook", None]]

# define the schema
schema = StructType([StructField("Name", StringType()),StructField("Experience", IntegerType())])

# create a dataframe 
df = spark.createDataFrame(list_data,schema=schema)

df.show() # display the dataframe

In [None]:
# drop null value rows


In [None]:
# fill null value with a constant value


In [None]:
# replace a single value


In [None]:
# replace multiple values and also fill 'null' with a constant value


In [None]:
# rename the existing columns - "Item Type" to "ItemType" and "Total Profit" to "Total_Profit"
newDF = creditcardDF.withColumnRenamed("Item Type", "ItemType").withColumnRenamed("Total Profit", "Total_Profit")

# find maximum total_profit for each region and alias the column to "Maximum"


In [None]:
# count of items in each region


In [None]:
from pyspark.sql.functions import avg # include the library

# find average of column - "Total_Profit" 
newDF.select(avg("Total_Profit").alias("Average Profit")).show()

In [None]:
# include the library
from pyspark.sql.functions import col

# order the records by region - ascending
creditcardDF.orderBy('Region', ascending=True).select("Region","Country", "Item Type", "Order ID", "Total Profit").show(3)

In [None]:
# include the library
from pyspark.sql.functions import col

# order the records by region - descending
creditcardDF.orderBy('Region', ascending=False).select("Region","Country", "Item Type", "Order ID", "Total Profit").show(3)

In [None]:
# cache and persist
from pyspark import StorageLevel

# cache the dataframe in in-memory

# read the records from cache
cacheDF.select("Region", "Country", "Item Type", "Sales Channel", "Order Priority", \
               "Order Date", "Order ID", "Ship Date").show(4, truncate=False)

In [None]:
# cache and persist
from pyspark import StorageLevel

# persist the dataframe in both memo

# read the records from saved dataframe
persistDF.select("Region", "Country", "Item Type", "Sales Channel", "Order Priority", \
               "Order Date", "Order ID").show(4, truncate=False)

In [None]:
# coalesce vs repartition

# increase the number of partitions

# number of partitions after repatitioning

# reduce the number of partitions

# number of partitions after coalesce


In [None]:
# aggregates the Item Type count by region, brings the data to a single partition
writeDF = newDF.groupBy("Region").agg({'ItemType':'count'}).coalesce(1)  

# write to DBFS - mode: "overwrite" replaces the existing file and "append" adds the content
writeDF.write.option("header","true").option("sep",",").mode("overwrite").csv("/FileStore/tables/Aggregate/")

In [None]:
%fs ls "/FileStore/tables/Aggregate/"

In [None]:
# read the csv file
newDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true") \
   .load("/FileStore/tables/Aggregate/part-00000-tid-2341650654956389289-3daeebed-c9cf-4cf0-98bb-7a2a9f21cae7-175-1-c000.csv")

# display the records
newDF.show(10, False)

In [None]:
# spark SQL
# create a DataFrame
from pyspark.sql.types import *   # import the library
leader_data = [["Middle East and North Africa","Mohammed Saif"],["Australia and Oceania", "George Carlin"], \
               ["Europe", "Stuart Broad"], ["Sub-Saharan Africa", "Abdalla"], ["Central America and the Caribbean", "Chris Gayle"], \
               ["North America", "George Bush"], ["Asia", "Tatyaso Martin"]]

# define the schema
schema = StructType([StructField("Region", StringType()), StructField("SalesPerson", StringType())])

# create a dataframe and display the records
df = spark.createDataFrame(leader_data,schema=schema)
df.show(10, False)

In [None]:
df.createOrReplaceTempView("sales_table")  # convert dataframe to view

# write sql queries using sql()
sql("select * from sales_table").show(10, False)

In [None]:
sql("select * from sales_table where Region = 'Asia'").show(10, False)

In [None]:
sql("select * from sales_table where SalesPerson like '%George%'").show(10, False)

In [None]:
sql("select count(*) from sales_table").show()

In [None]:
creditcardDF.createOrReplaceTempView("creditcard")

sql("select * from creditcard").show(1, False)

In [None]:
# renaming a column using DSL
newDF = creditcardDF.withColumnRenamed("Total Revenue", "TotalRevenue")

# create a temp view

newDF.createOrReplaceTempView("creditcard")

# apply aggregations on the table data
sql("select Region, max(TotalRevenue) from creditcard group by Region").show(truncate=False)

In [None]:
sql("select Region, max(TotalRevenue) from creditcard group by Region order by Region").show(truncate=False)

In [None]:
sql("select Region, max(TotalRevenue) from creditcard group by Region order by Region desc").show(truncate=False)

In [None]:
# join (inner) creditcard and sales_table, display the results
sql("""select a.Region, a.Country, b.SalesPerson
       from creditcard a
       join sales_table b
       on trim(a.Region) = trim(b.Region)""").show(5, False)

In [None]:
# join (inner) creditcard and sales_table, apply a where condition, display the results
df = sql("""select a.Region, a.Country, b.SalesPerson
       from creditcard a
       join sales_table b
       on trim(a.Region) = trim(b.Region)
       where trim(a.Region) = "Asia"
       """).show(5, False)

In [None]:
# write the results in to DBFS
df = sql("""select a.Region, a.Country, b.SalesPerson
       from creditcard a
       join sales_table b
       on trim(a.Region) = trim(b.Region)
       where trim(a.Region) = "Asia"
       """)


df.coalesce(1).write.option("header","true").mode("overwrite").csv("/FileStore/tables/spark/")

In [None]:
%fs ls "/FileStore/tables/spark/" 

In [None]:
# read the csv file from stored location
newDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true") \
.load("/FileStore/tables/spark/part-00000-tid-7896517250555459845-acebf487-b3db-448d-ab44-06942b9588b2-254-1-c000.csv")

newDF.show(10, False)