In [0]:
#Importing pyspark Lib.
import pyspark

#Making spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('assign').getOrCreate()

#About Spark
spark

In [0]:
#Que 1. Create a table using below information.

In [0]:
Product_data =[('Washing Machine',1648770933, 20000,'Samsung', 'India','0001'),
               ('Refrigerator',1648770999,35000,' LG','null','0002'),
               ('Air Cooler',1648770948,45000,' Voltas','null','0003')]

user_schema = ["Product_Name","Issue_Date","Price","Brand","Country","Product_number"]
DF= spark.createDataFrame(data=Product_data,schema=user_schema)

DF.printSchema()
display(DF)

root
 |-- Product_Name: string (nullable = true)
 |-- Issue_Date: long (nullable = true)
 |-- Price: long (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Product_number: string (nullable = true)



Product_Name,Issue_Date,Price,Brand,Country,Product_number
Washing Machine,1648770933,20000,Samsung,India,1
Refrigerator,1648770999,35000,LG,,2
Air Cooler,1648770948,45000,Voltas,,3


In [0]:
#Convert the Issue Date with the timestamp format.

In [0]:
from pyspark.sql.functions import *
DF1= DF.withColumn('Issue_Date_timestamp', from_unixtime(substring(col('Issue_Date'), 1, 10), "yyyy-MM-dd'T'HH:mm:ss[.SSS][ZZZ]"))
DF1.show(truncate=False)

+---------------+----------+-----+-------+-------+--------------+----------------------------+
|Product_Name   |Issue_Date|Price|Brand  |Country|Product_number|Issue_Date_timestamp        |
+---------------+----------+-----+-------+-------+--------------+----------------------------+
|Washing Machine|1648770933|20000|Samsung|India  |0001          |2022-03-31T23:55:33.000+0000|
|Refrigerator   |1648770999|35000| LG    |null   |0002          |2022-03-31T23:56:39.000+0000|
|Air Cooler     |1648770948|45000| Voltas|null   |0003          |2022-03-31T23:55:48.000+0000|
+---------------+----------+-----+-------+-------+--------------+----------------------------+



In [0]:
#b) Convert timestamp to date type

In [0]:
DF1= DF1.withColumn('Issue_date_todate', to_date(col('Issue_Date_timestamp')))

In [0]:
#c) Remove the starting extra space in Brand column for LG and Voltas fields

In [0]:
DF1= DF1.withColumn("Brand_",ltrim("Brand"))

DF1.show(truncate= False)

+---------------+----------+-----+-------+-------+--------------+----------------------------+-----------------+-------+
|Product_Name   |Issue_Date|Price|Brand  |Country|Product_number|Issue_Date_timestamp        |Issue_date_todate|Brand_ |
+---------------+----------+-----+-------+-------+--------------+----------------------------+-----------------+-------+
|Washing Machine|1648770933|20000|Samsung|India  |0001          |2022-03-31T23:55:33.000+0000|2022-03-31       |Samsung|
|Refrigerator   |1648770999|35000| LG    |null   |0002          |2022-03-31T23:56:39.000+0000|2022-03-31       |LG     |
|Air Cooler     |1648770948|45000| Voltas|null   |0003          |2022-03-31T23:55:48.000+0000|2022-03-31       |Voltas |
+---------------+----------+-----+-------+-------+--------------+----------------------------+-----------------+-------+



In [0]:
#d) Replace null values with empty values in Country column

In [0]:
from pyspark.sql.functions import regexp_replace

DF1 = DF1.withColumn('Country_without_null_values', regexp_replace('Country', 'null', ''))

DF1.show(truncate=False)

+---------------+----------+-----+-------+-------+--------------+----------------------------+-----------------+-------+---------------------------+
|Product_Name   |Issue_Date|Price|Brand  |Country|Product_number|Issue_Date_timestamp        |Issue_date_todate|Brand_ |Country_without_null_values|
+---------------+----------+-----+-------+-------+--------------+----------------------------+-----------------+-------+---------------------------+
|Washing Machine|1648770933|20000|Samsung|India  |0001          |2022-03-31T23:55:33.000+0000|2022-03-31       |Samsung|India                      |
|Refrigerator   |1648770999|35000| LG    |null   |0002          |2022-03-31T23:56:39.000+0000|2022-03-31       |LG     |                           |
|Air Cooler     |1648770948|45000| Voltas|null   |0003          |2022-03-31T23:55:48.000+0000|2022-03-31       |Voltas |                           |
+---------------+----------+-----+-------+-------+--------------+----------------------------+------------

In [0]:
#2. Create a table using below information

In [0]:
transaction_info = [(150711,123456,"EN",456789,"2021-12-27T08:20:29.842+0000","0001"),
                    (150439,234567,"UK",345678,"2021-12-27T08:21:14.645+0000","0002"),
                    (150647,345678,"ES",234567,"2021-12-27T08:22:42.445+0000","0003")
                   ]

transaction_schema = ["SourceId","TransactionNumber","Language","ModelNumber","StartTime","ProductNumber"]

df = spark.createDataFrame(data=transaction_info,schema=transaction_schema)

df.printSchema()
display(df)

root
 |-- SourceId: long (nullable = true)
 |-- TransactionNumber: long (nullable = true)
 |-- Language: string (nullable = true)
 |-- ModelNumber: long (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- ProductNumber: string (nullable = true)



SourceId,TransactionNumber,Language,ModelNumber,StartTime,ProductNumber
150711,123456,EN,456789,2021-12-27T08:20:29.842+0000,1
150439,234567,UK,345678,2021-12-27T08:21:14.645+0000,2
150647,345678,ES,234567,2021-12-27T08:22:42.445+0000,3


In [0]:
"""
a) Change the camel case columns to snake case
Example: SourceId: source_id, TransactionNumber: transaction_number
"""

In [0]:
import re

def camel_to_snake(name):
    name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()

df1 = df.withColumnRenamed('SourceId', camel_to_snake('source_id')) \
                    .withColumnRenamed('TransactionNumber', camel_to_snake('transaction_number')) \
                    .withColumnRenamed('Language', camel_to_snake('language'))\
                    .withColumnRenamed('ModelNumber', camel_to_snake('model_number')) \
                    .withColumnRenamed('StartTime', camel_to_snake('start_time')) \
                    .withColumnRenamed('ProductNumber', camel_to_snake('product_number'))

#sourceDF1 = sourceDF.withColumnRenamed('SourceId','source_id',)

df1.show(truncate= False)

+---------+------------------+--------+------------+----------------------------+--------------+
|source_id|transaction_number|language|model_number|start_time                  |product_number|
+---------+------------------+--------+------------+----------------------------+--------------+
|150711   |123456            |EN      |456789      |2021-12-27T08:20:29.842+0000|0001          |
|150439   |234567            |UK      |345678      |2021-12-27T08:21:14.645+0000|0002          |
|150647   |345678            |ES      |234567      |2021-12-27T08:22:42.445+0000|0003          |
+---------+------------------+--------+------------+----------------------------+--------------+



In [0]:
"""
b) Add another column as start_time_ms and convert the values of StartTime to milliseconds.

Example:
Input: 2021-12-27T08:20:29.842+0000 -> Output: 1640593229842
Input: 2021-12-27T08:21:14.645+0000 -> Output: 1640593274645
Input: 2021-12-27T08:22:42.445+0000 -> Output: 1640593362445
Input: 2021-12-27T08:22:43.183+0000 -> Output: 1640593363183
"""

In [0]:
df1= df1.withColumn('start_time_ms', concat(unix_timestamp(to_date(date_format('start_time',"yyyy-MM-dd HH:mm:ss.SSS"))),substring('start_time',21,3)))
df1.show(truncate= False)

+---------+------------------+--------+------------+----------------------------+--------------+-------------+
|source_id|transaction_number|language|model_number|start_time                  |product_number|start_time_ms|
+---------+------------------+--------+------------+----------------------------+--------------+-------------+
|150711   |123456            |EN      |456789      |2021-12-27T08:20:29.842+0000|0001          |1640563200842|
|150439   |234567            |UK      |345678      |2021-12-27T08:21:14.645+0000|0002          |1640563200645|
|150647   |345678            |ES      |234567      |2021-12-27T08:22:42.445+0000|0003          |1640563200445|
+---------+------------------+--------+------------+----------------------------+--------------+-------------+



In [0]:
#3. Combine both the tables based on the Product Number
#and get all the fields in return.

In [0]:
join_df= DF.join(df, DF.Product_number == df.ProductNumber, 'FULLOUTER')
#sourceDF =df
#productDF = DF
df3=join_df.show(truncate=False)
join_df.printSchema()

+---------------+----------+-----+-------+-------+--------------+--------+-----------------+--------+-----------+----------------------------+-------------+
|Product_Name   |Issue_Date|Price|Brand  |Country|Product_number|SourceId|TransactionNumber|Language|ModelNumber|StartTime                   |ProductNumber|
+---------------+----------+-----+-------+-------+--------------+--------+-----------------+--------+-----------+----------------------------+-------------+
|Washing Machine|1648770933|20000|Samsung|India  |0001          |150711  |123456           |EN      |456789     |2021-12-27T08:20:29.842+0000|0001         |
|Refrigerator   |1648770999|35000| LG    |null   |0002          |150439  |234567           |UK      |345678     |2021-12-27T08:21:14.645+0000|0002         |
|Air Cooler     |1648770948|45000| Voltas|null   |0003          |150647  |345678           |ES      |234567     |2021-12-27T08:22:42.445+0000|0003         |
+---------------+----------+-----+-------+-------+--------

In [0]:
#And get the country as EN

In [0]:
# Using equals condition
join_df.filter(join_df.Language == "EN").show(truncate=False)

df3 = join_df.select('Country').filter(join_df.Language == "EN").show(truncate=False)


+---------------+----------+-----+-------+-------+--------------+--------+-----------------+--------+-----------+----------------------------+-------------+
|Product_Name   |Issue_Date|Price|Brand  |Country|Product_number|SourceId|TransactionNumber|Language|ModelNumber|StartTime                   |ProductNumber|
+---------------+----------+-----+-------+-------+--------------+--------+-----------------+--------+-----------+----------------------------+-------------+
|Washing Machine|1648770933|20000|Samsung|India  |0001          |150711  |123456           |EN      |456789     |2021-12-27T08:20:29.842+0000|0001         |
+---------------+----------+-----+-------+-------+--------------+--------+-----------------+--------+-----------+----------------------------+-------------+

+-------+
|Country|
+-------+
|India  |
+-------+



In [0]:
from pyspark.sql.functions import when
df3.withColumn('Country',when(df3.Country.endswith('EN'),regexp_replace(df3.Country,'EN','EN')).when(df3.Country.endswith('UK'),regexp_replace(df3.Country,'UK','EN')).when(df3.Country.endswith('ES'),regexp_replace(df3.Country,'ES','EN')).otherwise(df3.Country)).show(truncate=False)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-2180890536996593>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;32mfrom[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mfunctions[0m [0;32mimport[0m [0mwhen[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mdf3[0m[0;34m.[0m[0mwithColumn[0m[0;34m([0m[0;34m'Country'[0m[0;34m,[0m[0mwhen[0m[0;34m([0m[0mdf3[0m[0;34m.[0m[0mCountry[0m[0;34m.[0m[0mendswith[0m[0;34m([0m[0;34m'EN'[0m[0;34m)[0m[0;34m,[0m[0mregexp_replace[0m[0;34m([0m[0mdf3[0m[0;34m.[0m[0mCountry[0m[0;34m,[0m[0;34m'EN'[0m[0;34m,[0m[0;34m'EN'[0m[0;34m)[0m[0;34m)[0m[0;34m.[0m[0mwhen[0m[0;34m([0m[0mdf3[0m[0;34m.[0m[0mCountry[0m[0;34m.[0m[0mendswith[0m[0;34m([0m[0;34m'UK'[0m[0;34m)[0m[0;34m,[0m[0mregexp_replace[0m[0;34m([0m[0mdf3[0m[

In [0]:
df3.Country.replace('UK', EN).show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-2180890536996594>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mdf3[0m[0;34m.[0m[0mCountry[0m[0;34m.[0m[0mreplace[0m[0;34m([0m[0;34m'UK'[0m[0;34m,[0m [0mEN[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mAttributeError[0m: 'NoneType' object has no attribute 'Country'

In [0]:
from pyspark.sql.functions import *
newDf = df3.regexp_replace('Country', regexp_replace('Country', 'UK', 'EN'))

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-2180890536996595>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;32mfrom[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mfunctions[0m [0;32mimport[0m [0;34m*[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mnewDf[0m [0;34m=[0m [0mdf3[0m[0;34m.[0m[0mregexp_replace[0m[0;34m([0m[0;34m'Country'[0m[0;34m,[0m [0mregexp_replace[0m[0;34m([0m[0;34m'Country'[0m[0;34m,[0m [0;34m'UK'[0m[0;34m,[0m [0;34m'EN'[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mAttributeError[0m: 'NoneType' object has no attribute 'regexp_replace'