In [1]:
#import a sparksession
from pyspark.sql import SparkSession

In [2]:
#create a sparksession and store in spark variable
spark=SparkSession.builder \
.appName("test") \
.getOrCreate()

In [3]:
#create a df with 10 numbers
df=spark.range(10).toDF("numbers")
df.show()

+-------+
|numbers|
+-------+
|      0|
|      1|
|      2|
|      3|
|      4|
|      5|
|      6|
|      7|
|      8|
|      9|
+-------+



In [4]:
#from df take only divisibile_by_2 numbers and store in divisibile_by_2 variable
divisible_by_2=df.where("numbers%2=0")
divisible_by_2.show()

+-------+
|numbers|
+-------+
|      0|
|      2|
|      4|
|      6|
|      8|
+-------+



In [5]:
#load the csv file and store it in csv_load variable and show the output
csv_load=spark.read.format("csv").option("header","true").option("inferSchema","true").csv("2015-summary.csv")
csv_load.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [6]:
#load the csv file and store it in csv_load variable and sort the data with count column and show the output
from pyspark.sql.functions import desc,asc
a=csv_load.sort(asc("count"))
a.show()
a.explain()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Estonia|    1|
|              Kosovo|      United States|    1|
|              Zambia|      United States|    1|
|       United States|   Papua New Guinea|    1|
|               Malta|      United States|    1|
|       United States|          Gibraltar|    1|
|            Suriname|      United States|    1|
|       United States|            Croatia|    1|
|            Djibouti|      United States|    1|
|        Burkina Faso|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|             Cyprus|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
|              Cyprus|      United States|    1|
|       United States|          Lithuania|    1|
|       United States|           Bulgaria|    1|
|       United State

In [7]:
#load the csv file and store it in csv_load variable and create a temporary table "sql_csv_table"
csv_load.createOrReplaceTempView("sql_csv_table")

In [8]:
#load the csv file and store it in csv_load variable and create a temporary table "sql_csv_table",
#write a query in sql with grouping DEST_COUNTRY_NAME having count(1),stroing the data in sql_data variable
sql_data=spark.sql("select DEST_COUNTRY_NAME,count(*) from sql_csv_table group by DEST_COUNTRY_NAME")
sql_data.show()

+--------------------+--------+
|   DEST_COUNTRY_NAME|count(1)|
+--------------------+--------+
|            Anguilla|       1|
|              Russia|       1|
|            Paraguay|       1|
|             Senegal|       1|
|              Sweden|       1|
|            Kiribati|       1|
|              Guyana|       1|
|         Philippines|       1|
|            Djibouti|       1|
|            Malaysia|       1|
|           Singapore|       1|
|                Fiji|       1|
|              Turkey|       1|
|                Iraq|       1|
|             Germany|       1|
|              Jordan|       1|
|               Palau|       1|
|Turks and Caicos ...|       1|
|              France|       1|
|              Greece|       1|
+--------------------+--------+
only showing top 20 rows



In [9]:
#load the csv file and store it in csv_load variable and create a temporary table "sql_csv_table",
# write a query in sql with max count column,stroing the data in sql_data variable
#==========
#load the csv file and store it in csv_load variable, select max of the count column store the result in pyspark_sql variable

sql_data=spark.sql("select max(count) from sql_csv_table")
sql_data.show()

+----------+
|max(count)|
+----------+
|    370002|
+----------+



In [10]:
#write a query in sql selecting DEST_COUNTRY_NAME and sum of count of grouping DEST_COUNTRY_NAME as destination,show the result in desc
#store the result in csv_sql variable
csv_sql=spark.sql("select DEST_COUNTRY_NAME,sum(count) as destination from sql_csv_table group by DEST_COUNTRY_NAME order by destination desc")
csv_sql.show()
csv_sql.explain(True)

+------------------+-----------+
| DEST_COUNTRY_NAME|destination|
+------------------+-----------+
|     United States|     411352|
|            Canada|       8399|
|            Mexico|       7140|
|    United Kingdom|       2025|
|             Japan|       1548|
|           Germany|       1468|
|Dominican Republic|       1353|
|       South Korea|       1048|
|       The Bahamas|        955|
|            France|        935|
|          Colombia|        873|
|            Brazil|        853|
|       Netherlands|        776|
|             China|        772|
|           Jamaica|        666|
|        Costa Rica|        588|
|       El Salvador|        561|
|            Panama|        510|
|              Cuba|        466|
|             Spain|        420|
+------------------+-----------+
only showing top 20 rows

== Parsed Logical Plan ==
'Sort ['destination DESC NULLS LAST], true
+- 'Aggregate ['DEST_COUNTRY_NAME], ['DEST_COUNTRY_NAME, 'sum('count) AS destination#99]
   +- 'UnresolvedRelatio

In [11]:
#load the csv file and store it in csv_load variable
#selecting DEST_COUNTRY_NAME and sum of count of grouping DEST_COUNTRY_NAME as destination,show the result in desc
from pyspark.sql.functions import sum,col
a=csv_load.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)","destination").sort(desc("destination"))
a.show()

+------------------+-----------+
| DEST_COUNTRY_NAME|destination|
+------------------+-----------+
|     United States|     411352|
|            Canada|       8399|
|            Mexico|       7140|
|    United Kingdom|       2025|
|             Japan|       1548|
|           Germany|       1468|
|Dominican Republic|       1353|
|       South Korea|       1048|
|       The Bahamas|        955|
|            France|        935|
|          Colombia|        873|
|            Brazil|        853|
|       Netherlands|        776|
|             China|        772|
|           Jamaica|        666|
|        Costa Rica|        588|
|       El Salvador|        561|
|            Panama|        510|
|              Cuba|        466|
|             Spain|        420|
+------------------+-----------+
only showing top 20 rows



In [12]:
#load the by_day folder and store in load_byday_csv and create the sql temp table as retail_data, assign the schema of load_byday_csv to schema
#and print the schema
from pyspark.sql.functions import input_file_name
load_byday_csv=spark.read.format("csv").option("header","true").option("inferSchema","true").csv("directory to extract/by-day/*.csv")
load_byday_csv.createOrReplaceTempView("retail_data")
schema=load_byday_csv.schema
load_byday_csv.show()

load_byday_csv1=spark.read.format("csv").option("header","true").option("inferSchema","true").csv("directory to extract/by-day").withColumn("filename",input_file_name())
load_byday_csv1.show(5,False)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
|   580538|    21544|SKULLS  WATER TRA...|      48|2011-12-05 08:38:00|     0.85|   14075.0|United Kingdom|
|   580538|    23126|FELTCRA

In [13]:
#load the by_day folder and store in load_byday_csv and select CustomerID,UnitPrice * Quantity as total_cost,InvoiceDate
#group by CustomerID with window function as 1 day, sum the total and rename it to total_cost and sort with desc of total_cost
from pyspark.sql.functions import desc,asc,window,col
a=load_byday_csv.selectExpr("CustomerID","(UnitPrice * Quantity) as total_cost","InvoiceDate") \
    .groupBy("CustomerID",window(col("InvoiceDate"),"1 day")).sum("total_cost") \
    .withColumnRenamed("sum(total_cost)","total_cost").sort(desc("total_cost"))
a.show(5,False)

+----------+------------------------------------------+------------------+
|CustomerID|window                                    |total_cost        |
+----------+------------------------------------------+------------------+
|17450.0   |{2011-09-20 00:00:00, 2011-09-21 00:00:00}|71601.44          |
|NULL      |{2011-11-14 00:00:00, 2011-11-15 00:00:00}|55316.08          |
|NULL      |{2011-11-07 00:00:00, 2011-11-08 00:00:00}|42939.17          |
|NULL      |{2011-03-29 00:00:00, 2011-03-30 00:00:00}|33521.39999999998 |
|NULL      |{2011-12-08 00:00:00, 2011-12-09 00:00:00}|31975.590000000007|
+----------+------------------------------------------+------------------+
only showing top 5 rows



In [14]:
#manually set the shuffle partitions to 5
spark.conf.set("spark.sql.shuffle.partitions","5")

In [15]:
#load the csv files and store in load_by_data variable, store the schema in schema variable
#read the stream by setting maxFilesPerTrigger as 1
streaming_data=spark.readStream.schema(schema).option("header","true").option("maxFilesPerTrigger",1) \
                .load("directory to extract/by-day/*.csv")

In [16]:
#check if its streaming or not
streaming_data.isStreaming

True

In [17]:
#load the csv files and store in load_by_data variable, store the schema in schema variable
#read the stream by setting maxFilesPerTrigger as 1 and check if its streaming or not in read_stream variable
#with read_stream select CustomerID,UnitPrice * Quantity as total_cost,InvoiceDate
#group by CustomerID with window function as 1 day, sum the total_cost store the entire result in purchasebycustomerperhour
from pyspark.sql.functions import col,window
load_by_data=spark.read.format("csv").option("header","true").option("inferSchema","true").load("directory to extract/by-day/*")
schema=load_by_data.schema
read_stream=spark.readStream.option("maxFilesPerTrigger",1).schema(schema).option("header","true").csv("directory to extract/by-day/*")
purchasebycustomerperhour=read_stream.selectExpr("CustomerID","(UnitPrice * Quantity) as total_cost","InvoiceDate") \
                        .groupBy("CustomerID",window(col("InvoiceDate"),"1 day")).sum("total_cost")

In [18]:
#stop the write_stream and with purchasebycustomerperhour write the stream,store in memory,and tablename is customer_purchases, output the mode as complete and start
purchasebycustomerperhour.writeStream.format("memory").queryName("customer_purchases").outputMode("complete").start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7ff3b0c96050>

In [19]:
#select all the columns with sql in customer_purchases table and store in customer variable
customer=spark.sql("select * from customer_purchases")
customer.show(5,False)

+----------+------+---------------+
|CustomerID|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+



In [20]:
#load the csv files directory to extract/by-day/* and store in load_by_data variable and print the schema in 2 different ways
load_by_data.printSchema()
load_by_data.schema

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



StructType([StructField('InvoiceNo', StringType(), True), StructField('StockCode', StringType(), True), StructField('Description', StringType(), True), StructField('Quantity', IntegerType(), True), StructField('InvoiceDate', TimestampType(), True), StructField('UnitPrice', DoubleType(), True), StructField('CustomerID', DoubleType(), True), StructField('Country', StringType(), True)])

In [21]:
#import date_format and col functions
#in preppedDataFrame fill null values with 0 and add new column day_of_week contains the format of "EEEE" to column InvoiceDate
#and reduce the partitions to 5
from pyspark.sql.functions import date_format,col
preppedDataFrame=load_by_data.na.fill(0).withColumn("day_of_week",date_format(col("InvoiceDate"),"EEEE")).coalesce(5)

In [22]:
#using preppedDataFrame create 2 variables
#one contains preppedDataFrame of InvoiceDate < '2011-07-01' and store in trainDataFrame varibale
#another contains preppedDataFrame of InvoiceDate >= '2011-07-01' and store in testDataFrame varibale
#check the count of trainDataFrame,testDataFrame

trainDataFrame=preppedDataFrame.where("InvoiceDate<'2011-07-01'")
testDataFrame=preppedDataFrame.where("InvoiceDate>='2011-07-01'")
trainDataFrame.count() #245903
testDataFrame.count() #296006

296006

In [23]:
#create a dataframe in rdd_df variable contians 1,2,3 using row
from pyspark.sql import Row
rdd_df=spark.sparkContext.parallelize([Row(1),Row(2),Row(3)]).toDF()
rdd_df.show()

+---+
| _1|
+---+
|  1|
|  2|
|  3|
+---+



In [24]:
#create a dataframe number_500 contains 500 numbers and add each value + 10 and show the result
number_500=spark.range(500).toDF("numbers2")
number_500.show(5)
number_500.select(number_500["numbers2"]+10).show(5)

+--------+
|numbers2|
+--------+
|       0|
|       1|
|       2|
|       3|
|       4|
+--------+
only showing top 5 rows

+---------------+
|(numbers2 + 10)|
+---------------+
|             10|
|             11|
|             12|
|             13|
|             14|
+---------------+
only showing top 5 rows



In [25]:
#range of 2 numbers collect the numbers and store in rows_collect and print rows_collect
rows_collect=spark.range(2).toDF("new_df")
rows_collect.collect()

[Row(new_df=0), Row(new_df=1)]

In [26]:
#load the json data and store in json_data variable and show the result
json_data=spark.read.format("json").option("header","true").option("inferSchema","true").json("2015-summary.json")
json_data.show(5)
json_data.printSchema()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [27]:
#manually assign schema of to the json_data by loading and select only count column and store the result in a 
from pyspark.sql.types import StructType,StructField,StringType,LongType
schema=StructType([StructField("DEST_COUNTRY_NAME",StringType(),True),StructField("ORIGIN_COUNTRY_NAME",StringType(),True),StructField("count",LongType(),True)])
a=spark.read.format("json").schema(schema).option("header","true").load("2015-summary.json")
a.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [28]:
#manually assign schema of to the json_data by loading and select only count column and store the result in a 
from pyspark.sql.types import StructType,StructField,StringType,LongType
schema=StructType([StructField("DEST_COUNTRY_NAME",StringType(),True),StructField("ORIGIN_COUNTRY_NAME",StringType(),True),StructField("count",LongType(),True)])
a=spark.read.format("json").schema(schema).load("2015-summary.json")
a.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [29]:
from pyspark.sql.functions import col
a.columns
a.first()
a.select(col("count")).show(5)

+-----+
|count|
+-----+
|   15|
|    1|
|  344|
|   15|
|   62|
+-----+
only showing top 5 rows



In [30]:
#access only columns of json data and store in json_data and print the result
json_data=spark.read.format("json").load("2015-summary.json").columns
print(json_data)

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']


In [31]:
#create a row of values manually and show the result of 2nd index
from pyspark.sql import Row
manual_rows=([Row("name","no"),Row("sush",1),Row("dolly",2)])
manual_rows[2]

<Row('dolly', 2)>

In [32]:
#load the json data into json_data variable and create temp table df_table
json_data=spark.read.format("json").load("2015-summary.json")
json_data.createOrReplaceTempView("df_table")

In [33]:
#in manual_schema store schema(string,string,int) and df_data store random data 
#and create the dataframe in new_df variable with df_data and manual_schema and print the result of new_df
from pyspark.sql.types import StructField,StructType,StringType,LongType
manual_schema=StructType([StructField("user_name",StringType(),False),StructField("city",StringType(),False),StructField("num",LongType(),False)])
df_data=[Row("sush","Kompally",1),Row("dolly","kmr",2)]
new_df=spark.createDataFrame(df_data,manual_schema)
new_df.show()

+---------+--------+---+
|user_name|    city|num|
+---------+--------+---+
|     sush|Kompally|  1|
|    dolly|     kmr|  2|
+---------+--------+---+



In [34]:
#load the json data in json_data variable and select only "DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME" and show the result
json_data=spark.read.format("json").load("2015-summary.json")
json_data.select(col("DEST_COUNTRY_NAME"),col("ORIGIN_COUNTRY_NAME")).show(5)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
|    United States|            Ireland|
|            Egypt|      United States|
|    United States|              India|
+-----------------+-------------------+
only showing top 5 rows



In [35]:
#load the json data in json_data variable and using col,column,expr select the column DEST_COUNTRY_NAME and show the result of 2 records
from pyspark.sql.functions import col,column,expr
json_data=spark.read.format("json").load("2015-summary.json")
json_data.select(col("DEST_COUNTRY_NAME"),column("DEST_COUNTRY_NAME"),expr("DEST_COUNTRY_NAME")).show(2)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows



In [36]:
#load the json data in json_data variable and select the column DEST_COUNTRY_NAME as destination with expr and show 2 records
json_data=spark.read.format("json").load("2015-summary.json")
json_data.select(col("DEST_COUNTRY_NAME").alias("destination")).show(2)
json_data.select(expr("DEST_COUNTRY_NAME").alias("destination")).show(2)
json_data.select(expr("DEST_COUNTRY_NAME as destination")).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [37]:
#load the json data in json_data variable and select the column DEST_COUNTRY_NAME as dest with expr, again rename to destination1 and show 5 records
json_data=spark.read.format("json").load("2015-summary.json")
json_data.select(expr("DEST_COUNTRY_NAME as dest").alias("destination1")).show(5)

+-------------+
| destination1|
+-------------+
|United States|
|United States|
|United States|
|        Egypt|
|United States|
+-------------+
only showing top 5 rows



In [38]:
#load the json data in json_data variable and select the column DEST_COUNTRY_NAME as dest, DEST_COUNTRY_NAME with selectExpr, show 2 records
json_data=spark.read.format("json").load("2015-summary.json")
json_data.selectExpr("DEST_COUNTRY_NAME as dest","DEST_COUNTRY_NAME").show(2)

+-------------+-----------------+
|         dest|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



In [39]:
#load the json data in json_data variable and select all columns, if DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME store in withincountry and show 5 records
json_data=spark.read.format("json").load("2015-summary.json")
json_data.selectExpr("*","(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(5)

json_data.withColumn("withinCountry",(col("DEST_COUNTRY_NAME") == col("ORIGIN_COUNTRY_NAME"))).show(5)


+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
|    United States|            Ireland|  344|        false|
|            Egypt|      United States|   15|        false|
|    United States|              India|   62|        false|
+-----------------+-------------------+-----+-------------+
only showing top 5 rows

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
|    United States|            Ireland|  344|        false|
|            Egypt|      United States|   15|        false|
|    United Sta

In [40]:
#load the json data in json_data variable and select avg of count,count of distinct DEST_COUNTRY_NAME
json_data=spark.read.format("json").load("2015-summary.json")
json_data.selectExpr("avg(count)","count(distinct DEST_COUNTRY_NAME)").show()

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



In [41]:
#select all columns of json_data and add new column as One stroing lit value of 1 and print 2 records
from pyspark.sql.functions import lit
json_data.withColumn("one",lit(1)).show(2)
json_data.select(expr("*"),lit(1).alias("one")).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|one|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|one|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



In [42]:
#load the json data in json_data variable
#a variable contains data of adding new column name as new_column contains value of literal 1 and show 2 records
#b variable contains data of adding new column name as new_column contains value if DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME and show 2 records
#c variable contains data of adding new column name as destination contains value of DEST_COUNTRY_NAME and show 2 records
#d variable contains data of renaming column DEST_COUNTRY_NAME to new_column destination and show 2 records
a=json_data.withColumn("new_column",lit(1))
a.show(2)
b=json_data.withColumn("new_column",col("DEST_COUNTRY_NAME") == col("ORIGIN_COUNTRY_NAME"))
b.show(2)
c=json_data.withColumn("destination",col("DEST_COUNTRY_NAME"))
c.show(2)
d=json_data.withColumnRenamed("DEST_COUNTRY_NAME","destination")
d.show(2)

+-----------------+-------------------+-----+----------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|new_column|
+-----------------+-------------------+-----+----------+
|    United States|            Romania|   15|         1|
|    United States|            Croatia|    1|         1|
+-----------------+-------------------+-----+----------+
only showing top 2 rows

+-----------------+-------------------+-----+----------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|new_column|
+-----------------+-------------------+-----+----------+
|    United States|            Romania|   15|     false|
|    United States|            Croatia|    1|     false|
+-----------------+-------------------+-----+----------+
only showing top 2 rows

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|  destination|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|United States|
|    United States|       

In [43]:
#load the json data in json_data variable
#a variable contains data of adding new column name as "This Long Column-name" contains value of ORIGIN_COUNTRY_NAME and show 2 records
#b variable contains data of selecting new column name as "This Long Column-name" and selecting "This Long Column-name" as new_column and show 2 records
#c variable contains data of selecting new column name as "This Long Column-name" and columns and print the result
a=json_data.withColumn("This Long Column-name",col("ORIGIN_COUNTRY_NAME"))
a.show(2)
b=a.select("This Long Column-name",col("This Long Column-name").alias("new_column"))
b.show(2)
c=b.select(col("new_column").alias("This Long Column-name")).columns
print(c)

+-----------------+-------------------+-----+---------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|This Long Column-name|
+-----------------+-------------------+-----+---------------------+
|    United States|            Romania|   15|              Romania|
|    United States|            Croatia|    1|              Croatia|
+-----------------+-------------------+-----+---------------------+
only showing top 2 rows

+---------------------+----------+
|This Long Column-name|new_column|
+---------------------+----------+
|              Romania|   Romania|
|              Croatia|   Croatia|
+---------------------+----------+
only showing top 2 rows

['This Long Column-name']


In [44]:
#load the json data in json_data variable and delete the column DEST_COUNTRY_NAME and show 2 records
json_data.drop(col("DEST_COUNTRY_NAME")).show(2)

+-------------------+-----+
|ORIGIN_COUNTRY_NAME|count|
+-------------------+-----+
|            Romania|   15|
|            Croatia|    1|
+-------------------+-----+
only showing top 2 rows



In [45]:
#load the json data in json_data variable and 
#a variable contains data of adding new column name as "new_column" contains value of "count" column and show 2 records,schema
#b variable contains data of a and selecting new_column column name and change the type to int and show the schema
a=json_data.withColumn("new_column",col("count"))
a.show(2)
a.schema
b=a.withColumn("new_column",col("count").cast("int"))
b.schema

c=a.select(col("count").cast("int"))
c.schema

+-----------------+-------------------+-----+----------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|new_column|
+-----------------+-------------------+-----+----------+
|    United States|            Romania|   15|        15|
|    United States|            Croatia|    1|         1|
+-----------------+-------------------+-----+----------+
only showing top 2 rows



StructType([StructField('count', IntegerType(), True)])

In [46]:
#load the json data in json_data variable and 
#a variable contains data of count<2 with where condition and show 4 records
#b variable contains data of count<2 with filter condition and show 4 records
#c variable contains data of count<2,ORIGIN_COUNTRY_NAME!=Croatia with where condition and show 4 records
#d variable contains data of "DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME" distinct and show count of records
#e variable contains data of "DEST_COUNTRY_NAME" distinct and show count of records

json_data=spark.read.format("json").load("2015-summary.json")
a=json_data.where(col("count")<2)
a.show(4)
b=json_data.filter(col("count")<2)
b.show(4)
c=json_data.where(col("count")<2).where(col("ORIGIN_COUNTRY_NAME")!="Croatia")
c.show(4)
d=json_data.select(col("DEST_COUNTRY_NAME"),col("ORIGIN_COUNTRY_NAME")).distinct().count()
print(d)
e=json_data.select(col("DEST_COUNTRY_NAME")).distinct().count()
print(e)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
|            Malta|      United States|    1|
+-----------------+-------------------+-----+
only showing top 4 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
|            Malta|      United States|    1|
+-----------------+-------------------+-----+
only showing top 4 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      Unite

In [47]:
#load the json data in json_data variable, with sample split the data into 50% with constant 5 with replacement as false and count the result
json_data=spark.read.format("json").load("2015-summary.json")
seed=5
withReplacement=False
fraction=0.5
json_data.sample(withReplacement,fraction,seed).count()


138

In [48]:
#import the json data in json_data variable and split the data into 25%,75% with seed 5 and check count if its true or false
a=json_data.randomSplit([0.25,0.75],5)
print(a[0].count(),a[1].count())

71 185


In [49]:
#load the json data into json_data variable and store the schema of json_data in schema variable
#add new_rows contains ("sush","asara",2),("dolly","asara",4)
#parallelize the new_rows store in parallelize variable
#create a dataframe with parallelize ,schema in new_df variable and show the result of new_df
#json_data union new_df where count=2,dest_country_name!=origin_country_name and show the result
json_data=spark.read.format("json").load("2015-summary.json")
schema=json_data.schema
new_rows=([Row("sush","asara",2),Row("dolly","asara",4)])
parallelize=spark.sparkContext.parallelize(new_rows)
new_df=spark.createDataFrame(new_rows,schema)
json_data.union(new_df).where(col("count")==2).where("dest_country_name!=origin_country_name").show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Liberia|      United States|    2|
|          Hungary|      United States|    2|
|    United States|            Vietnam|    2|
|         Malaysia|      United States|    2|
|          Croatia|      United States|    2|
|    United States|            Liberia|    2|
|    United States|              Malta|    2|
|          Georgia|      United States|    2|
|            Niger|      United States|    2|
|    United States|          Indonesia|    2|
|        Greenland|      United States|    2|
|             sush|              asara|    2|
+-----------------+-------------------+-----+



In [50]:
#load the json data in json_data variable
#in a variable sort the result of count with sort and show 5 records
#in b variable sort the result of count,DEST_COUNTRY_NAME with orderBy and show 5 records
a=json_data.sort("count")
a.show(5)
b=json_data.orderBy("count","DEST_COUNTRY_NAME")
b.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|        Burkina Faso|      United States|    1|
|       Cote d'Ivoire|      United States|    1|
|              Cyprus|      United States|    1|
|            Djibouti|      United States|    1|
|           Indonesia|      United States|    1|
|                Iraq|      United States|    1|
|              Kosovo|      United States|    1|
|               Malta|      United States|  

In [51]:
#load the json data in json_data variable
#sort the count values in desc with orderby in a variable and show 2 records
#sort the count values in desc and dest_country_name in asc with orderby in b variable and show 2 records
from pyspark.sql.functions import asc,desc
a=json_data.orderBy(desc("count"))
a.show(2)
b=json_data.orderBy(desc("count"),asc("dest_country_name"))
b=json_data.orderBy(asc("dest_country_name"),desc("count"))
b.show(2)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Algeria|      United States|    4|
|           Angola|      United States|   15|
+-----------------+-------------------+-----+
only showing top 2 rows



In [52]:
#load the json data in json_data variable and using limit with 5 show the result
#in a variable sort the count desc and using limit with 5 show the result
#in b variable repartition json_data to 5 and check how many partitioned it got
#in c variable repartition json_data based on column ORIGIN_COUNTRY_NAME and check how many partitioned it got
#in d variable repartition json_data based on column DEST_COUNTRY_NAME with 4 and check how many partitioned it got
json_data=spark.read.format("json").load("2015-summary.json")
json_data.show(5)
a=json_data.sort(desc("count")).limit(5)
a.show()
b=json_data.repartition(5)
b.rdd.getNumPartitions()
c=json_data.repartition(col("ORIGIN_COUNTRY_NAME"))
c.rdd.getNumPartitions()
d=json_data.repartition(4,col("DEST_COUNTRY_NAME"))
d.rdd.getNumPartitions()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+



4

In [53]:
#load the json data in json_data variable and print the result with show,take,show(value,False),collect
json_data.show(5)
json_data.take(5)
json_data.show(2,False)
json_data.collect()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
+-----------------+-------------------+-----+
only showing top 2 rows



[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Sint Maarten', count=325),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Marshall Islands', count=39),
 

In [54]:
#load the csv retail data in retail_data variable and print the result with show and 
#create a temp table retail_temp_table and print schema of retail_data
retail_data=spark.read.format("csv").option("header","true").option("inferSchema","true").csv("2010-12-01.csv")
retail_data.show(5)
retail_data.createOrReplaceTempView("retail_temp_table")

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [55]:
#import lit and select the 5,0.5,five
from pyspark.sql.functions import lit
retail_data.select(lit(5),lit(0.5),lit("five"))

DataFrame[5: int, 0.5: double, five: string]

In [56]:
# from retail_data select invoiceno,description where InvoiceNo not equal to 536365 and show the result
from pyspark.sql.functions import col
a=retail_data.select(col("invoiceno"),col("description")).where(col("InvoiceNo")!=536365)
a.show(5)

+---------+--------------------+
|invoiceno|         description|
+---------+--------------------+
|   536366|HAND WARMER UNION...|
|   536366|HAND WARMER RED P...|
|   536367|ASSORTED COLOUR B...|
|   536367|POPPY'S PLAYHOUSE...|
|   536367|POPPY'S PLAYHOUSE...|
+---------+--------------------+
only showing top 5 rows



In [57]:
#load the csv retail data in retail_data variable and print the result with show and 
#in pricefilter variable check UnitPrice is grater tan 600
#in descriptionfilter variable check Description col contains POSTAGE and greater than or equal to 1 with instr
#with reatial_data check StockCode is dot with == and pricefilter or descriptionfilter and show
#with reatial_data check StockCode is dot with isin and pricefilter or descriptionfilter and show
from pyspark.sql.functions import instr

retail_data=spark.read.format("csv").option("header","true").option("inferSchema","true").load("2010-12-01.csv")
pricefilter=col("UnitPrice")>600
descriptionfilter=instr(col("Description"),"POSTAGE") >=1
retail_data.where(col("StockCode")=="DOT").where(pricefilter | descriptionfilter).show(5)


retail_data.where((col("StockCode")=="DOT") & (pricefilter | descriptionfilter)).show(5)

+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      NULL|United Kingdom|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      NULL|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+

+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      

In [58]:
#load the csv retail data in retail_data variable and print the result with show and 
#in pricefilter variable check UnitPrice is grater tan 600
#in stockcodefilter variable check stockcodefilter contains dot init
#in descriptionfilter variable check Description col contains POSTAGE and greater than or equal to 1 with instr
#with reatial_data add new column isexpensive where  descriptionfilter and (pricefilter or stockcodefilter) in checking_filters variable
#and show if isexpensive is true

pricefilter=col("UnitPrice") > 600
stockcodefilter=col("StockCode").isin("DOT")
descriptionfilter=instr(col("Description"),"POSTAGE")>=1
retail_data.withColumn("isexpensive",descriptionfilter & (pricefilter | stockcodefilter)).where("isexpensive").show(5)

+---------+---------+--------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|isexpensive|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+-----------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      NULL|United Kingdom|       true|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      NULL|United Kingdom|       true|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+-----------+



In [59]:
#load the csv retail data in retail_data variable and print the result with show and 
#add new column isexpensive where not(unitprice) is less than or equal to 250 and show if isexpensive is true
from pyspark.sql.functions import expr
retail_data.withColumn("isexpensive",expr("not unitprice<=250")).where("isexpensive").show(5)

+---------+---------+--------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|isexpensive|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+-----------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      NULL|United Kingdom|       true|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      NULL|United Kingdom|       true|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+-----------+



In [60]:
a=retail_data.where(col("Description").eqNullSafe("hello"))
a.withColumn("new",col("Description").isin("hello")).show(5)

+---------+---------+-----------+--------+-----------+---------+----------+-------+---+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|new|
+---------+---------+-----------+--------+-----------+---------+----------+-------+---+
+---------+---------+-----------+--------+-----------+---------+----------+-------+---+



In [61]:
#load the csv retail data in retail_data variable and 
#using pow function ((Quantity*UnitPrice) ^2 )+5 store in fabricated_quantity variable
#and show the result with just select statement of 2 records of columns CustomerID and its fabricated_quantity as "actual"
#and show the result with just selectexpr statement of 2 records of columns CustomerID and its fabricated_quantity as "actual"
from pyspark.sql.functions import pow
fabricated_quantity=pow((col("Quantity")*col("UnitPrice")),2)+5
retail_data.select(col("CustomerID"),fabricated_quantity.alias("actual")).show(2)

retail_data.selectExpr("CustomerID","POWER(Quantity*UnitPrice,2)+5 as actual").show(2)

+----------+------------------+
|CustomerID|            actual|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows

+----------+------------------+
|CustomerID|            actual|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows



In [62]:
#load the csv retail data in retail_data variable and 
#with round(lit(2.5)),bround(lit(2.5)) check what does the output
from pyspark.sql.functions import lit,round,bround
retail_data=spark.read.format("csv").option("header","true").option("inferSchema","true").csv("2010-12-01.csv")
retail_data.select(round(lit(2.5)),bround(lit(2.5))).show(1)

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|          3.0|           2.0|
+-------------+--------------+
only showing top 1 row



In [63]:
#load the csv retail data in retail_data variable and 
#with corr between columns UnitPrice,Quantity check how much its linear relationship 
#with describe function check the retail_data of count,min,max,mean,stddev
from pyspark.sql.functions import corr
retail_data.select(corr("UnitPrice","Quantity")).show(1)
retail_data.stat.corr("UnitPrice","Quantity")

+-------------------------+
|corr(UnitPrice, Quantity)|
+-------------------------+
|     -0.04112314436835552|
+-------------------------+



-0.04112314436835552

In [64]:
retail_data.describe().show()

+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|         UnitPrice|        CustomerID|       Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|  count|             3108|              3108|                3098|              3108|              3108|              1968|          3108|
|   mean| 536516.684944841|27834.304044117645|                NULL| 8.627413127413128| 4.151946589446603|15661.388719512195|          NULL|
| stddev|72.89447869788873|17407.897548583845|                NULL|26.371821677029203|15.638659854603892|1854.4496996893627|          NULL|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|               -24|               0.0|           12431.0|     Australia|
|    max|          C

In [65]:
#colname variable contains unitprice,quantileprobs variable contains list of 0.5 value, relerror contains 0.05 value
#check .stat.approxQuantile with 3 varibales with retail_data
#check .stat.crosstab with "StockCode","Quantity" varibales with retail_data
#check .stat.freqItems with "StockCode","Quantity" varibales with retail_data
#with monotonically_increasing_id select and show 2 records
from pyspark.sql.functions import col,monotonically_increasing_id
retail_data=spark.read.option("header","true").option("inferSchema","true").csv("2010-12-01.csv")
colname="UnitPrice"
quantileprobs=[0.5]
relerror=0.05
retail_data.stat.approxQuantile(colname,quantileprobs,relerror)
retail_data.stat.crosstab("StockCode","Quantity").show(2)
retail_data.stat.freqItems(["StockCode","Quantity"]).show(2)
retail_data.select(monotonically_increasing_id(),col("StockCode")).show(2)

+------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|StockCode_Quantity| -1|-10|-12| -2|-24| -3| -4| -5| -6| -7|  1| 10|100| 11| 12|120|128| 13| 14|144| 15| 16| 17| 18| 19|192|  2| 20|200| 21|216| 22| 23| 24| 25|252| 27| 28|288|  3| 30| 32| 33| 34| 36|384|  4| 40|432| 47| 48|480|  5| 50| 56|  6| 60|600| 64|  7| 70| 72|  8| 80|  9| 96|
+------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|            84029E|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  3|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0| 

In [66]:
#load the csv retail data in retail_data variable and 
#use function initcap,upper,lower for description variable and show result
from pyspark.sql.functions import initcap,upper,lower
retail_data=spark.read.option("header","true").option("inferSchema","true").csv("2010-12-01.csv")
retail_data.select("Description",initcap("Description"),upper("Description"),lower("Description")).show(2,False)

+----------------------------------+----------------------------------+----------------------------------+----------------------------------+
|Description                       |initcap(Description)              |upper(Description)                |lower(Description)                |
+----------------------------------+----------------------------------+----------------------------------+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|White Hanging Heart T-light Holder|WHITE HANGING HEART T-LIGHT HOLDER|white hanging heart t-light holder|
|WHITE METAL LANTERN               |White Metal Lantern               |WHITE METAL LANTERN               |white metal lantern               |
+----------------------------------+----------------------------------+----------------------------------+----------------------------------+
only showing top 2 rows



In [67]:
#using ltrim,rtrim,rpad,lpad,trim select ("    HELLO    ") and check the result
from pyspark.sql.functions import ltrim,rtrim,rpad,lpad,lit
retail_data.select(lit("    HELLO    "),ltrim(lit("    HELLO    ")).alias("ltrim")).show(1)
retail_data.select(lit("    HELLO    "),rtrim(lit("    HELLO    ")).alias("ltrim")).show(1)
retail_data.select(lit("    HELLO    "),rpad(lit("    HELLO    "),14,"x").alias("rpad")).show(1)
retail_data.select(lit("    HELLO    "),lpad(lit("    HELLO    "),14,"x").alias("lpad")).show(1)
retail_data.select(lit("    HELLO    "),lpad(lit("    HELLO    "),5,"x").alias("lpad")).show(1)

+-------------+---------+
|    HELLO    |    ltrim|
+-------------+---------+
|    HELLO    |HELLO    |
+-------------+---------+
only showing top 1 row

+-------------+---------+
|    HELLO    |    ltrim|
+-------------+---------+
|    HELLO    |    HELLO|
+-------------+---------+
only showing top 1 row

+-------------+--------------+
|    HELLO    |          rpad|
+-------------+--------------+
|    HELLO    |    HELLO    x|
+-------------+--------------+
only showing top 1 row

+-------------+--------------+
|    HELLO    |          lpad|
+-------------+--------------+
|    HELLO    |x    HELLO    |
+-------------+--------------+
only showing top 1 row

+-------------+-----+
|    HELLO    | lpad|
+-------------+-----+
|    HELLO    |    H|
+-------------+-----+
only showing top 1 row



In [68]:
#using regexp_replace wherever col description contains BLACK|WHITE|RED|GREEN|BLUE then replaced with sush
#using translate wherever hanging comes replace with 1234
from pyspark.sql.functions import regexp_replace,regexp_extract,translate
retail_data=spark.read.option("header","true").option("inferSchema","true").csv("2010-12-01.csv")
string_val="BLACK|WHITE|RED|GREEN|BLUE"
retail_data.select("Description",regexp_replace("Description",string_val,"sush")).show(2)
retail_data.select("Description",translate("Description","HANGING","1234")).show(2)

+--------------------+----------------------------------------------------------------+
|         Description|regexp_replace(Description, BLACK|WHITE|RED|GREEN|BLUE, sush, 1)|
+--------------------+----------------------------------------------------------------+
|WHITE HANGING HEA...|                                            sush HANGING HEAR...|
| WHITE METAL LANTERN|                                              sush METAL LANTERN|
+--------------------+----------------------------------------------------------------+
only showing top 2 rows

+--------------------+-------------------------------------+
|         Description|translate(Description, HANGING, 1234)|
+--------------------+-------------------------------------+
|WHITE HANGING HEA...|                 W1TE 123434 1E2RT...|
| WHITE METAL LANTERN|                   W1TE MET2L L23TER3|
+--------------------+-------------------------------------+
only showing top 2 rows



In [69]:
#using regexp_extract wherever col description contains BLACK|WHITE|RED|GREEN|BLUE then extract the first
# For each row, it returns the extracted color (if any) along with the original "Description" value and then displays the first 5 result
retail_data.select("Description",regexp_extract("Description","(BLACK|WHITE|RED|GREEN|BLUE)",0)).show(5,False)

+-----------------------------------+------------------------------------------------------------+
|Description                        |regexp_extract(Description, (BLACK|WHITE|RED|GREEN|BLUE), 0)|
+-----------------------------------+------------------------------------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER |WHITE                                                       |
|WHITE METAL LANTERN                |WHITE                                                       |
|CREAM CUPID HEARTS COAT HANGER     |                                                            |
|KNITTED UNION FLAG HOT WATER BOTTLE|                                                            |
|RED WOOLLY HOTTIE WHITE HEART.     |RED                                                         |
+-----------------------------------+------------------------------------------------------------+
only showing top 5 rows



In [70]:
#contains_black variable check col(Description) contains black,contains_white variable check col(Description) contains white
#add new column to retail_data haswhiteorblack with or condition check it contains black or white and that is true and select only description
from pyspark.sql.functions import instr
contains_black=instr(col("Description"),"black") >=1
contains_white=instr(col("Description"),"WHITE") >=1
retail_data.withColumn("haswhiteorblack",contains_black | contains_white).select("haswhiteorblack","Description").show(5,False)

+---------------+-----------------------------------+
|haswhiteorblack|Description                        |
+---------------+-----------------------------------+
|true           |WHITE HANGING HEART T-LIGHT HOLDER |
|true           |WHITE METAL LANTERN                |
|false          |CREAM CUPID HEARTS COAT HANGER     |
|false          |KNITTED UNION FLAG HOT WATER BOTTLE|
|true           |RED WOOLLY HOTTIE WHITE HEART.     |
+---------------+-----------------------------------+
only showing top 5 rows



In [71]:
#create a list simplecolors=["black","white","red","blue","green"]
#selectedcolumns varibale is an empty list
#create a function colo_locator 
#you have a DataFrame named retail_data with a column Description containing text descriptions of retail items. 
#You want to create new boolean columns indicating the presence of specific colors in the Description column with is_red,is_black etc
#After that, you wish to filter and display the rows where the Description contains either "white" or "red".
from pyspark.sql.functions import locate,col,expr
simplecolors=["black","white","red","blue","green"]
selectedcolumns=[]

def color_locator(column,color_string):
    return locate(color_string.upper(),column).cast("boolean").alias("is_"+color_string)
for i in simplecolors:
    a=color_locator(retail_data.Description,i)
    selectedcolumns.append(a)
print(selectedcolumns)
selectedcolumns.append(col("Description"))
retail_data.select(*selectedcolumns).where(expr("is_white or is_red")).show(3,False)


[Column<'CAST(locate(BLACK, Description, 1) AS BOOLEAN) AS is_black'>, Column<'CAST(locate(WHITE, Description, 1) AS BOOLEAN) AS is_white'>, Column<'CAST(locate(RED, Description, 1) AS BOOLEAN) AS is_red'>, Column<'CAST(locate(BLUE, Description, 1) AS BOOLEAN) AS is_blue'>, Column<'CAST(locate(GREEN, Description, 1) AS BOOLEAN) AS is_green'>]
+--------+--------+------+-------+--------+----------------------------------+
|is_black|is_white|is_red|is_blue|is_green|Description                       |
+--------+--------+------+-------+--------+----------------------------------+
|false   |true    |false |false  |false   |WHITE HANGING HEART T-LIGHT HOLDER|
|false   |true    |false |false  |false   |WHITE METAL LANTERN               |
|false   |true    |true  |false  |false   |RED WOOLLY HOTTIE WHITE HEART.    |
+--------+--------+------+-------+--------+----------------------------------+
only showing top 3 rows



In [72]:
#create a dataframe datadf of range(10) and adding two columns today column contains current_date and now column contains current_timestamp
#create a temp table "datatable" with datadf
#print the schema of datadf
from pyspark.sql.functions import current_date,current_timestamp
datadf=spark.range(10).withColumn("today",current_date()).withColumn("now",current_timestamp())
datadf.createOrReplaceTempView("datatable")
datadf.schema
datadf.show(11,False)
datadf.printSchema()

+---+----------+--------------------------+
|id |today     |now                       |
+---+----------+--------------------------+
|0  |2025-05-19|2025-05-19 16:50:43.880564|
|1  |2025-05-19|2025-05-19 16:50:43.880564|
|2  |2025-05-19|2025-05-19 16:50:43.880564|
|3  |2025-05-19|2025-05-19 16:50:43.880564|
|4  |2025-05-19|2025-05-19 16:50:43.880564|
|5  |2025-05-19|2025-05-19 16:50:43.880564|
|6  |2025-05-19|2025-05-19 16:50:43.880564|
|7  |2025-05-19|2025-05-19 16:50:43.880564|
|8  |2025-05-19|2025-05-19 16:50:43.880564|
|9  |2025-05-19|2025-05-19 16:50:43.880564|
+---+----------+--------------------------+

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



In [73]:
#import date_add,date_sub and with existing datadf dataframe ad 5 days to today column and subtract 5 days to today column and show
from pyspark.sql.functions import date_add,date_sub
datadf.select(date_add("today",5),date_sub("today",5)).show(2)

+------------------+------------------+
|date_add(today, 5)|date_sub(today, 5)|
+------------------+------------------+
|        2025-05-24|        2025-05-14|
|        2025-05-24|        2025-05-14|
+------------------+------------------+
only showing top 2 rows



In [74]:
#import 3 functions datediff,months_between,to_date
#with existing datadf dataframe add new column week_age contains data of subtract 7 days of today column and checkdiff with datediff and show diff
#with existing datadf dataframe lit("2016-01-01"),lit("2017-05-22") convert the 2 lit into date with to_date and start_date and end_date columns
#select months_between diff of 2 columns of start_date and end_date columns
from pyspark.sql.functions import datediff,months_between,to_date,lit
datadf.withColumn("week_age",date_sub("today",7)).select(datediff("today","week_age")).show(1)
datadf.withColumn("start_date",to_date(lit("2016-01-01"))).withColumn("end_date",to_date(lit("2017-05-22"))) \
        .select(months_between("start_date","end_date")).show(2)

+-------------------------+
|datediff(today, week_age)|
+-------------------------+
|                        7|
+-------------------------+
only showing top 1 row

+------------------------------------------+
|months_between(start_date, end_date, true)|
+------------------------------------------+
|                              -16.67741935|
|                              -16.67741935|
+------------------------------------------+
only showing top 2 rows



In [75]:
#create a dataframe with range(5) and add new column "date" contains value of "2000-10-16" and while select convert into str to date with to_date
#with existing datadf dataframe select "2000-10-16","2000-16-10" by converting into date with to_date

df=spark.range(5).withColumn("date",lit("2000-10-16")).select(to_date("date"))
df.select(to_date(lit("2000-10-16")),to_date(lit("2000-16-10"))).show(1)

+-------------------+-------------------+
|to_date(2000-10-16)|to_date(2000-16-10)|
+-------------------+-------------------+
|         2000-10-16|               NULL|
+-------------------+-------------------+
only showing top 1 row



In [76]:
#import to_timestamp function 
#in dateFormat variable store the format 'yyyy-dd-MM'
#create dataframe range(1) and
#selectint 2 columns with to_date 1 is "first" contains value "2000-16-10" and "second" contains value "2000-20-12"
#with to_timestamp select first and second columns
from pyspark.sql.functions import to_timestamp,col
dateFormat="yyyy-dd-MM"
df=spark.range(1).select(to_date(lit("2000-16-10"),dateFormat).alias("first"),to_date(lit("2000-20-12"),dateFormat).alias("second"))
df.show(1)
df.select(to_timestamp("first"),to_timestamp("second")).show(1)
df.filter(col("first")>col("second")).show(1)
df.filter(col("first")<"2000-10-17").show(1)

+----------+----------+
|     first|    second|
+----------+----------+
|2000-10-16|2000-12-20|
+----------+----------+

+-------------------+--------------------+
|to_timestamp(first)|to_timestamp(second)|
+-------------------+--------------------+
|2000-10-16 00:00:00| 2000-12-20 00:00:00|
+-------------------+--------------------+

+-----+------+
|first|second|
+-----+------+
+-----+------+

+----------+----------+
|     first|    second|
+----------+----------+
|2000-10-16|2000-12-20|
+----------+----------+



In [77]:
#in df variable read the 2010-12-01.csv data and check the CustomerID column contains null or not
from pyspark.sql.functions import coalesce
df=spark.read.format("csv").option("header","true").option("inferSchema","true").load("2010-12-01.csv")
df.show(2)
df.select(coalesce(col("CustomerID"))).show(2,False)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 2 rows

+--------------------+
|coalesce(CustomerID)|
+--------------------+
|17850.0             |
|17850.0             |
+--------------------+
only showing top 2 rows



In [78]:
#create a dataframe in df variable like below
# +------+-------+-----+
# |  name|roll_no|class|
# +------+-------+-----+
# | first|      1|  1st|
# |second|      1|  2nd|
# | third|      2|  1st|
# |fourth|      2|  2nd|
# |  NULL|   NULL| NULL|
# |  sush|   NULL|  1st|
# +------+-------+-----+
#drop the records with any,all,all with specific columns 
from pyspark.sql.types import StructType,StructField,StringType,LongType
from pyspark.sql import Row
manual_schema=StructType([StructField("name",StringType(),True),StructField("roll_no",LongType(),True),StructField("class",StringType(),True)])
new_rows=[Row("first",1,"1st"),Row("second",1,"2nd"),Row("third",2,"1st"),Row("fourth",2,"2nd"),Row(None,None,None),Row("sush",None,"1st")]
new_df=spark.sparkContext.parallelize(new_rows)
df=spark.createDataFrame(new_df,manual_schema)
df.show()
df.na.drop().show()
df.na.drop("any").show()
df.na.drop("all").show()
df.na.drop("all",subset=["name","roll_no"]).show()

+------+-------+-----+
|  name|roll_no|class|
+------+-------+-----+
| first|      1|  1st|
|second|      1|  2nd|
| third|      2|  1st|
|fourth|      2|  2nd|
|  NULL|   NULL| NULL|
|  sush|   NULL|  1st|
+------+-------+-----+

+------+-------+-----+
|  name|roll_no|class|
+------+-------+-----+
| first|      1|  1st|
|second|      1|  2nd|
| third|      2|  1st|
|fourth|      2|  2nd|
+------+-------+-----+

+------+-------+-----+
|  name|roll_no|class|
+------+-------+-----+
| first|      1|  1st|
|second|      1|  2nd|
| third|      2|  1st|
|fourth|      2|  2nd|
+------+-------+-----+

+------+-------+-----+
|  name|roll_no|class|
+------+-------+-----+
| first|      1|  1st|
|second|      1|  2nd|
| third|      2|  1st|
|fourth|      2|  2nd|
|  sush|   NULL|  1st|
+------+-------+-----+

+------+-------+-----+
|  name|roll_no|class|
+------+-------+-----+
| first|      1|  1st|
|second|      1|  2nd|
| third|      2|  1st|
|fourth|      2|  2nd|
|  sush|   NULL|  1st|
+------

In [79]:
#with the same df now fill the values in 2 different ways
#first is filling values with nulling for specific columns
#second is filling values with each column each different value
df.na.fill("nulling_value").show()
df.na.fill("nulling",subset=["name","roll_no","class"]).show()
filling={"name":"new_value","roll_no":5,"class":"temp"}
df.na.fill(filling).show()

+-------------+-------+-------------+
|         name|roll_no|        class|
+-------------+-------+-------------+
|        first|      1|          1st|
|       second|      1|          2nd|
|        third|      2|          1st|
|       fourth|      2|          2nd|
|nulling_value|   NULL|nulling_value|
|         sush|   NULL|          1st|
+-------------+-------+-------------+

+-------+-------+-------+
|   name|roll_no|  class|
+-------+-------+-------+
|  first|      1|    1st|
| second|      1|    2nd|
|  third|      2|    1st|
| fourth|      2|    2nd|
|nulling|   NULL|nulling|
|   sush|   NULL|    1st|
+-------+-------+-------+

+---------+-------+-----+
|     name|roll_no|class|
+---------+-------+-----+
|    first|      1|  1st|
|   second|      1|  2nd|
|    third|      2|  1st|
|   fourth|      2|  2nd|
|new_value|      5| temp|
|     sush|      5|  1st|
+---------+-------+-----+



In [80]:
#using replace the column values to a different value
df.show()
df.na.replace(["1st"],["unknown"],"class").show()

+------+-------+-----+
|  name|roll_no|class|
+------+-------+-----+
| first|      1|  1st|
|second|      1|  2nd|
| third|      2|  1st|
|fourth|      2|  2nd|
|  NULL|   NULL| NULL|
|  sush|   NULL|  1st|
+------+-------+-----+

+------+-------+-------+
|  name|roll_no|  class|
+------+-------+-------+
| first|      1|unknown|
|second|      1|    2nd|
| third|      2|unknown|
|fourth|      2|    2nd|
|  NULL|   NULL|   NULL|
|  sush|   NULL|unknown|
+------+-------+-------+



In [81]:
#import struct, using struct select "Description","InvoiceNo" columns as "complex"
#create a table complexDF_table from the complexDF dataframe
from pyspark.sql.functions import struct
df=spark.read.format("csv").option("header","true").option("inferSchema","true").csv("2010-12-01.csv")
complexDF=df.select("*",struct("Description","InvoiceNo").alias("complex"))
complexDF.show(2,False)
complexDF.createOrReplaceTempView("complexDF_table")

+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+--------------------------------------------+
|InvoiceNo|StockCode|Description                       |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |complex                                     |
+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+--------------------------------------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER|6       |2010-12-01 08:26:00|2.55     |17850.0   |United Kingdom|{WHITE HANGING HEART T-LIGHT HOLDER, 536365}|
|536365   |71053    |WHITE METAL LANTERN               |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|{WHITE METAL LANTERN, 536365}               |
+---------+---------+----------------------------------+--------+-------------------+---------+----------+--------------+--------------------------------------------

In [82]:
#from complexDF dataframe select col complex,col complex of description,col complex of InvoiceNo with getfield
#select complex data in different columns like description and invoiceno
complexDF.select(col("complex"),col("complex").getField("Description"),col("complex").getField("InvoiceNo")).show(2,False)
complexDF.select("complex.Description").show(2,False)
complexDF.select("complex.*").show(2,False)

+--------------------------------------------+----------------------------------+-----------------+
|complex                                     |complex.Description               |complex.InvoiceNo|
+--------------------------------------------+----------------------------------+-----------------+
|{WHITE HANGING HEART T-LIGHT HOLDER, 536365}|WHITE HANGING HEART T-LIGHT HOLDER|536365           |
|{WHITE METAL LANTERN, 536365}               |WHITE METAL LANTERN               |536365           |
+--------------------------------------------+----------------------------------+-----------------+
only showing top 2 rows

+----------------------------------+
|Description                       |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN               |
+----------------------------------+
only showing top 2 rows

+----------------------------------+---------+
|Description                       |InvoiceNo|
+---------------------------------

In [83]:
#import the split function, with retail_data select description and 
#for every space split the description and rename new column as array_col
#for every space split the description and rename new column as array_col and select only 0th index values of array_col column
from pyspark.sql.functions import split
retail_data.show(2)
retail_data.select("Description",split("Description"," ").alias("array_col")).show(2,False)
retail_data.select("Description",split("Description"," ").alias("array_col")).selectExpr("array_col[0]").show(2,False)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 2 rows

+----------------------------------+----------------------------------------+
|Description                       |array_col                               |
+----------------------------------+----------------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|[WHITE, HANGING, HEART, T-LIGHT, HOLDER]|
|WHITE METAL LA

In [84]:
#import the size function, with retail_data select description and 
#check the size after spliting the col description with " "
from pyspark.sql.functions import size
retail_data.select("Description",split("Description"," ").alias("new")).select("Description","new",size("new")).show(2,False)

retail_data.select("Description",size(split("Description"," "))).show(2,False)

+----------------------------------+----------------------------------------+---------+
|Description                       |new                                     |size(new)|
+----------------------------------+----------------------------------------+---------+
|WHITE HANGING HEART T-LIGHT HOLDER|[WHITE, HANGING, HEART, T-LIGHT, HOLDER]|5        |
|WHITE METAL LANTERN               |[WHITE, METAL, LANTERN]                 |3        |
+----------------------------------+----------------------------------------+---------+
only showing top 2 rows

+----------------------------------+-------------------------------+
|Description                       |size(split(Description,  , -1))|
+----------------------------------+-------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|5                              |
|WHITE METAL LANTERN               |3                              |
+----------------------------------+-------------------------------+
only showing top 2 rows



In [85]:
#import the array_contains function, with retail_data select description and 
#check the array_contains has white in description column
from pyspark.sql.functions import array_contains
retail_data.select(col("Description"),array_contains(split("Description"," "),"WHITE")).show(2,False)

+----------------------------------+------------------------------------------------+
|Description                       |array_contains(split(Description,  , -1), WHITE)|
+----------------------------------+------------------------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|true                                            |
|WHITE METAL LANTERN               |true                                            |
+----------------------------------+------------------------------------------------+
only showing top 2 rows



In [86]:
#import the explode function, with retail_data add new column "splitted" contains splitting of description with " "
#and add another new column "exploded" contains explod of splitted column
#and select columns "Description", "InvoiceNo", "exploded"
from pyspark.sql.functions import explode
retail_data.withColumn("splitted",split("Description"," ")).withColumn("exploded",explode("splitted")).select( "Description", "InvoiceNo", "exploded").show(10,False)

+----------------------------------+---------+--------+
|Description                       |InvoiceNo|exploded|
+----------------------------------+---------+--------+
|WHITE HANGING HEART T-LIGHT HOLDER|536365   |WHITE   |
|WHITE HANGING HEART T-LIGHT HOLDER|536365   |HANGING |
|WHITE HANGING HEART T-LIGHT HOLDER|536365   |HEART   |
|WHITE HANGING HEART T-LIGHT HOLDER|536365   |T-LIGHT |
|WHITE HANGING HEART T-LIGHT HOLDER|536365   |HOLDER  |
|WHITE METAL LANTERN               |536365   |WHITE   |
|WHITE METAL LANTERN               |536365   |METAL   |
|WHITE METAL LANTERN               |536365   |LANTERN |
|CREAM CUPID HEARTS COAT HANGER    |536365   |CREAM   |
|CREAM CUPID HEARTS COAT HANGER    |536365   |CUPID   |
+----------------------------------+---------+--------+
only showing top 10 rows



In [87]:
#import the create_map function, with retail_data select create_map of Description and InvoiceNo
#rename it as complex_map
#do the same as above but now extra select like new_col[some value]
#do the same as above but explode the complex_map column

from pyspark.sql.functions import create_map
retail_data.select(create_map("Description","InvoiceNo").alias("complex_map")).show(2,False)
retail_data.select(create_map("Description","InvoiceNo").alias("complex_map")).selectExpr("complex_map['WHITE METAL LANTERN']").show(5,False)
retail_data.select(create_map("Description","InvoiceNo").alias("complex_map")).selectExpr("explode(complex_map)").show(5,False)


+----------------------------------------------+
|complex_map                                   |
+----------------------------------------------+
|{WHITE HANGING HEART T-LIGHT HOLDER -> 536365}|
|{WHITE METAL LANTERN -> 536365}               |
+----------------------------------------------+
only showing top 2 rows

+--------------------------------+
|complex_map[WHITE METAL LANTERN]|
+--------------------------------+
|NULL                            |
|536365                          |
|NULL                            |
|NULL                            |
|NULL                            |
+--------------------------------+
only showing top 5 rows

+-----------------------------------+------+
|key                                |value |
+-----------------------------------+------+
|WHITE HANGING HEART T-LIGHT HOLDER |536365|
|WHITE METAL LANTERN                |536365|
|CREAM CUPID HEARTS COAT HANGER     |536365|
|KNITTED UNION FLAG HOT WATER BOTTLE|536365|
|RED WOOLLY HOTTIE WHITE H

In [88]:
#in jsondf variable store the range(1) and contains value of '{"jsonkey":{"jsonvalue":[1,2,3,4]}}' as jsonstring

jsondf=spark.range(1).selectExpr(""" '{"jsonkey":{"jsonvalue":[1,2,3,4]}}' as jsonstring""")
jsondf.show(1,False)

+-----------------------------------+
|jsonstring                         |
+-----------------------------------+
|{"jsonkey":{"jsonvalue":[1,2,3,4]}}|
+-----------------------------------+



In [89]:
#import the udf,col functions and longtype 
#in udfExampleDF create a df "num" contains 5 numbers
#create a power function(power3) that return power of 3
#convert the python function into udf and store in powering variable
#using udfExampleDF dataframe pass the values to the power3 udf 
#register that power3 udf and using udfExampleDF dataframe with selectExpr call the udf
from pyspark.sql.functions import udf,col
from pyspark.sql.types import LongType
udfExampleDF=spark.range(5).toDF("num")
def power3(val):
    return val**3
power3(2)
powering=udf(power3)
udfExampleDF.select(powering(col("num"))).show(5)
spark.udf.register("power3",power3,LongType())
udfExampleDF.selectExpr("power3(num)").show()

+-----------+
|power3(num)|
+-----------+
|          0|
|          1|
|          8|
|         27|
|         64|
+-----------+

+-----------+
|power3(num)|
+-----------+
|          0|
|          1|
|          8|
|         27|
|         64|
+-----------+

