In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

productData = [('Washing Machine', '1648770933000', 20000, 'Samsung', 'India', '0001'),
               ('Refrigerator', '1648770999000', 35000, ' LG', 'null', '0002'),
               ('Air Cooler', '1648770948000', 45000, ' Voltas', 'null', '0003')
              ]
              
#productSchema = ['Product_Name', 'Issue_Date', 'Price', 'Brand', 'Country', 'Product_Number']

productSchema= StructType([StructField('Product_Name', StringType(), True),
                           StructField('Issue_Date', StringType(), True),
                           StructField('Price', IntegerType(), True),
                           StructField('Brand', StringType(), True),
                           StructField('Country', StringType(), True),
                           StructField('Product_Number', StringType(), True)
                          ])

productDF= spark.createDataFrame(data= productData, schema= productSchema )

productDF.show(truncate= False)
productDF.printSchema()


+---------------+-------------+-----+-------+-------+--------------+
|Product_Name   |Issue_Date   |Price|Brand  |Country|Product_Number|
+---------------+-------------+-----+-------+-------+--------------+
|Washing Machine|1648770933000|20000|Samsung|India  |0001          |
|Refrigerator   |1648770999000|35000| LG    |null   |0002          |
|Air Cooler     |1648770948000|45000| Voltas|null   |0003          |
+---------------+-------------+-----+-------+-------+--------------+

root
 |-- Product_Name: string (nullable = true)
 |-- Issue_Date: string (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Product_Number: string (nullable = true)



In [0]:

from delta.tables import *

DeltaTable.createOrReplace(spark) \
          .tableName('product_details') \
          .addColumn('Product_Name', 'STRING') \
          .addColumn('Issue_Date', 'STRING') \
          .addColumn('Price', 'INTEGER') \
          .addColumn('Brand', 'STRING') \
          .addColumn('Country', 'STRING') \
          .addColumn('Product_Number', 'STRING') \
          .property('description', 'table created for assignment') \
          .location('dbfs:/user/hive/warehouse/product_details') \
          .execute()


Out[3]: <delta.tables.DeltaTable at 0x7fdfe772ac10>

In [0]:
%sql

select count(*) from product_details;

DESCRIBE product_details;


col_name,data_type,comment
Product_Name,string,
Issue_Date,string,
Price,int,
Brand,string,
Country,string,
Product_Number,string,
,,
# Partitioning,,
Not partitioned,,


In [0]:
productDF.write.format('delta').mode('overwrite').saveAsTable('default.product_details')


In [0]:
(spark.sql('select count(*) from product_details')).show()

display(spark.sql('select * from product_details order by Product_Number'))


+--------+
|count(1)|
+--------+
|       3|
+--------+



Product_Name,Issue_Date,Price,Brand,Country,Product_Number
Washing Machine,1648770933000,20000,Samsung,India,1
Refrigerator,1648770999000,35000,LG,,2
Air Cooler,1648770948000,45000,Voltas,,3


In [0]:
from pyspark.sql.functions import *

# productDF1= productDF.withColumn('Issue_Date_timestamp', from_unixtime(col('Issue_Date')))

# productDF1= productDF.withColumn('Issue_Date_timestamp', from_unixtime(col('Issue_Date'), "yyyy-MM-dd'T'HH:mm:ss[.SSS][ZZZ]"))

"""
Note that Unix Epoch time doesn’t support a fraction of the second which is represented with SSS.
https://sparkbyexamples.com/pyspark/pyspark-sql-working-with-unix-time-timestamp/
"""

productDF1= productDF.withColumn('Issue_Date_timestamp', from_unixtime(substring(col('Issue_Date'), 1, 10), "yyyy-MM-dd'T'HH:mm:ss[.SSS][ZZZ]"))

# , "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]"
# For example, unix_timestamp, date_format, to_unix_timestamp, from_unixtime, to_date, to_timestamp, from_utc_timestamp, to_utc_timestamp,
# productDF2= productDF.withColumn('Issue_Date_timestamp', substring('Issue_Date', 1, 10))

#productDF.show(truncate=False)
productDF1.show(truncate=False)
# productDF2.show(truncate=False)


+---------------+-------------+-----+-------+-------+--------------+----------------------------+
|Product_Name   |Issue_Date   |Price|Brand  |Country|Product_Number|Issue_Date_timestamp        |
+---------------+-------------+-----+-------+-------+--------------+----------------------------+
|Washing Machine|1648770933000|20000|Samsung|India  |0001          |2022-03-31T23:55:33.000+0000|
|Refrigerator   |1648770999000|35000| LG    |null   |0002          |2022-03-31T23:56:39.000+0000|
|Air Cooler     |1648770948000|45000| Voltas|null   |0003          |2022-03-31T23:55:48.000+0000|
+---------------+-------------+-----+-------+-------+--------------+----------------------------+



In [0]:

productDF1= productDF1.withColumn('Issue_date_todate', to_date(col('Issue_Date_timestamp')))
# OR
# productDF2= productDF1.withColumn('Issue_date_todate', from_unixtime(col('Issue_Date_timestamp'), 'yyyy-MM-dd'))

#productDF.show(truncate=False)
productDF1.show(truncate=False)



+---------------+-------------+-----+-------+-------+--------------+----------------------------+-----------------+
|Product_Name   |Issue_Date   |Price|Brand  |Country|Product_Number|Issue_Date_timestamp        |Issue_date_todate|
+---------------+-------------+-----+-------+-------+--------------+----------------------------+-----------------+
|Washing Machine|1648770933000|20000|Samsung|India  |0001          |2022-03-31T23:55:33.000+0000|2022-03-31       |
|Refrigerator   |1648770999000|35000| LG    |null   |0002          |2022-03-31T23:56:39.000+0000|2022-03-31       |
|Air Cooler     |1648770948000|45000| Voltas|null   |0003          |2022-03-31T23:55:48.000+0000|2022-03-31       |
+---------------+-------------+-----+-------+-------+--------------+----------------------------+-----------------+



In [0]:

productDF1= productDF1.withColumn("Brand_without_space",ltrim("Brand"))

productDF1.show(truncate= False)



+---------------+-------------+-----+-------+-------+--------------+----------------------------+-----------------+-------------------+
|Product_Name   |Issue_Date   |Price|Brand  |Country|Product_Number|Issue_Date_timestamp        |Issue_date_todate|Brand_without_space|
+---------------+-------------+-----+-------+-------+--------------+----------------------------+-----------------+-------------------+
|Washing Machine|1648770933000|20000|Samsung|India  |0001          |2022-03-31T23:55:33.000+0000|2022-03-31       |Samsung            |
|Refrigerator   |1648770999000|35000| LG    |null   |0002          |2022-03-31T23:56:39.000+0000|2022-03-31       |LG                 |
|Air Cooler     |1648770948000|45000| Voltas|null   |0003          |2022-03-31T23:55:48.000+0000|2022-03-31       |Voltas             |
+---------------+-------------+-----+-------+-------+--------------+----------------------------+-----------------+-------------------+



In [0]:
#Replace part of string with another string
from pyspark.sql.functions import regexp_replace

productDF1 = productDF1.withColumn('Country_NoNull', regexp_replace('Country', 'null', ''))

productDF1.show(truncate=False)

"""
productDF2= productDF1.withColumn('Country', regexp_replace('Country', 'null', ''))
productDF2.show()
"""

+---------------+-------------+-----+-------+-------+--------------+----------------------------+-----------------+-------------------+--------------+
|Product_Name   |Issue_Date   |Price|Brand  |Country|Product_Number|Issue_Date_timestamp        |Issue_date_todate|Brand_without_space|Country_NoNull|
+---------------+-------------+-----+-------+-------+--------------+----------------------------+-----------------+-------------------+--------------+
|Washing Machine|1648770933000|20000|Samsung|India  |0001          |2022-03-31T23:55:33.000+0000|2022-03-31       |Samsung            |India         |
|Refrigerator   |1648770999000|35000| LG    |null   |0002          |2022-03-31T23:56:39.000+0000|2022-03-31       |LG                 |              |
|Air Cooler     |1648770948000|45000| Voltas|null   |0003          |2022-03-31T23:55:48.000+0000|2022-03-31       |Voltas             |              |
+---------------+-------------+-----+-------+-------+--------------+--------------------------

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

sourceData= [(150711, 123456, 'EN', '456789', '2021-12-27T08:20:29.842+0000', '0001'),
             (150439, 234567, 'UK', '345678', '2021-01-28T08:21:14.645+0000', '0002'),
             (150647, 345678, 'ES', '234567', '2021-12-27T08:22:42.445+0000', '0003')
             ]

#sourceSchema= ['SourceId', 'TransactionNumber', 'Language', 'ModelNumber', 'StartTime', 'ProductNumber']

sourceSchema= StructType([StructField('SourceId', IntegerType(), True),
                          StructField('TransactionNumber', IntegerType(), True),
                          StructField('Language', StringType(), True),
                          StructField('ModelNumber', StringType(), True),
                          StructField('StartTime', StringType(), True),
                          StructField('ProductNumber', StringType(), True)
                         ])

sourceDF= spark.createDataFrame(data= sourceData, schema= sourceSchema)

sourceDF.show(truncate=False)
sourceDF.printSchema()


+--------+-----------------+--------+-----------+----------------------------+-------------+
|SourceId|TransactionNumber|Language|ModelNumber|StartTime                   |ProductNumber|
+--------+-----------------+--------+-----------+----------------------------+-------------+
|150711  |123456           |EN      |456789     |2021-12-27T08:20:29.842+0000|0001         |
|150439  |234567           |UK      |345678     |2021-01-28T08:21:14.645+0000|0002         |
|150647  |345678           |ES      |234567     |2021-12-27T08:22:42.445+0000|0003         |
+--------+-----------------+--------+-----------+----------------------------+-------------+

root
 |-- SourceId: integer (nullable = true)
 |-- TransactionNumber: integer (nullable = true)
 |-- Language: string (nullable = true)
 |-- ModelNumber: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- ProductNumber: string (nullable = true)



In [0]:
%sql
CREATE TABLE IF NOT EXISTS source_details(
                                          SourceId            INT,
                                          TransactionNumber   INT,
                                          Language            STRING,
                                          ModelNumber         STRING,
                                          StartTime           STRING,
                                          ProductNumber       STRING
                                          ) USING DELTA
                                            LOCATION 'dbfs:/user/hive/warehouse/source_details' ;


In [0]:
%sql

select count(*) from source_details;

describe source_details;


col_name,data_type,comment
SourceId,int,
TransactionNumber,int,
Language,string,
ModelNumber,string,
StartTime,string,
ProductNumber,string,
,,
# Partitioning,,
Not partitioned,,


In [0]:

sourceDF.write.format('delta').mode('overwrite').saveAsTable('source_details')


In [0]:
(spark.sql('select count(*) from source_details')).show()

display(spark.sql('select * from source_details'))


+--------+
|count(1)|
+--------+
|       3|
+--------+



SourceId,TransactionNumber,Language,ModelNumber,StartTime,ProductNumber
150711,123456,EN,456789,2021-12-27T08:20:29.842+0000,1
150439,234567,UK,345678,2021-01-28T08:21:14.645+0000,2
150647,345678,ES,234567,2021-12-27T08:22:42.445+0000,3


In [0]:
sourceDF.columns


Out[19]: ['SourceId',
 'TransactionNumber',
 'Language',
 'ModelNumber',
 'StartTime',
 'ProductNumber']

In [0]:
import re
name= 'SourceId'
name1= 'SourceId'
# name = sourceDF.columns
name = re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower()
print(name)

name1 = re.sub(r'(?<!^)(?=[A-Z])', '_', name1)
print(name1)  # Source_Id

name1 = name1.upper()
print(name)  # source_id

source_id
Source_Id
source_id


In [0]:
import re

def camel_to_snake(name):
    name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()


#print(camel_to_snake('getHTTPResponseCode'))  # get_http_response_code
#print(camel_to_snake('HTTPResponseCodeXYZ'))  # http_response_code_xyz
#print(camel_to_snake('SourceId'))  # 

sourceDF1 = sourceDF.withColumnRenamed('SourceId', camel_to_snake('SourceId')) \
                    .withColumnRenamed('TransactionNumber', camel_to_snake('TransactionNumber')) \
                    .withColumnRenamed('Language', camel_to_snake('Language'))\
                    .withColumnRenamed('ModelNumber', camel_to_snake('ModelNumber')) \
                    .withColumnRenamed('StartTime', camel_to_snake('StartTime')) \
                    .withColumnRenamed('ProductNumber', camel_to_snake('ProductNumber'))

#sourceDF1 = sourceDF.withColumnRenamed('SourceId','source_id',)

sourceDF1.show(truncate= False)


+---------+------------------+--------+------------+----------------------------+--------------+
|source_id|transaction_number|language|model_number|start_time                  |product_number|
+---------+------------------+--------+------------+----------------------------+--------------+
|150711   |123456            |EN      |456789      |2021-12-27T08:20:29.842+0000|0001          |
|150439   |234567            |UK      |345678      |2021-01-28T08:21:14.645+0000|0002          |
|150647   |345678            |ES      |234567      |2021-12-27T08:22:42.445+0000|0003          |
+---------+------------------+--------+------------+----------------------------+--------------+



In [0]:
# unix_timestamp, date_format, to_unix_timestamp, from_unixtime, to_date, to_timestamp, from_utc_timestamp, to_utc_timestamp,unix_timestamp

#sourceDF2= sourceDF1.withColumn('start_time_ms', date_format('start_time',"yyyy-MM-dd HH:mm:ss[.SSS]"))
#  "yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]"
#sourceDF2.show(truncate= False)
#sourceDF1= sourceDF1.withColumn('start_time_ms', unix_timestamp(to_date(date_format('start_time',"yyyy-MM-dd HH:mm:ss.SSS z")+ substring('start_time',-7,3).cast('float'))))
#sourceDF1= sourceDF1.withColumn('start_time_sbr',substring('start_time',-7,3))

sourceDF1= sourceDF1.withColumn('start_time_ms', concat(unix_timestamp(to_date(date_format('start_time',"yyyy-MM-dd HH:mm:ss.SSS"))), substring('start_time',21,3)))


##sourceDF1.withColumn('start_time_ms', unix_timestamp(to_date(date_format('start_time',"yyyy-MM-dd HH:mm:ss.SSS")))).show(truncate= False)
##sourceDF1.select(concat('start_time',substring('start_time',21,3))).show(truncate= False)
"""
Note that Unix Epoch time doesn’t support a fraction of the second which is represented with SSS.
https://sparkbyexamples.com/pyspark/pyspark-sql-working-with-unix-time-timestamp/
"""

sourceDF1.show(truncate= False)


+---------+------------------+--------+------------+----------------------------+--------------+-------------+
|source_id|transaction_number|language|model_number|start_time                  |product_number|start_time_ms|
+---------+------------------+--------+------------+----------------------------+--------------+-------------+
|150711   |123456            |EN      |456789      |2021-12-27T08:20:29.842+0000|0001          |1640563200842|
|150439   |234567            |UK      |345678      |2021-01-28T08:21:14.645+0000|0002          |1611792000645|
|150647   |345678            |ES      |234567      |2021-12-27T08:22:42.445+0000|0003          |1640563200445|
+---------+------------------+--------+------------+----------------------------+--------------+-------------+



In [0]:
joinDF= productDF.join(sourceDF, productDF.Product_Number== sourceDF.ProductNumber, 'FULLOUTER')

joinDF.show(truncate=False)
joinDF.printSchema()


+---------------+-------------+-----+-------+-------+--------------+--------+-----------------+--------+-----------+----------------------------+-------------+
|Product_Name   |Issue_Date   |Price|Brand  |Country|Product_Number|SourceId|TransactionNumber|Language|ModelNumber|StartTime                   |ProductNumber|
+---------------+-------------+-----+-------+-------+--------------+--------+-----------------+--------+-----------+----------------------------+-------------+
|Washing Machine|1648770933000|20000|Samsung|India  |0001          |150711  |123456           |EN      |456789     |2021-12-27T08:20:29.842+0000|0001         |
|Refrigerator   |1648770999000|35000| LG    |null   |0002          |150439  |234567           |UK      |345678     |2021-01-28T08:21:14.645+0000|0002         |
|Air Cooler     |1648770948000|45000| Voltas|null   |0003          |150647  |345678           |ES      |234567     |2021-12-27T08:22:42.445+0000|0003         |
+---------------+-------------+-----+---

In [0]:
# Using equals condition
joinDF.filter(joinDF.Language == "EN").show(truncate=False)

joinDF.select('Country').filter(joinDF.Language == "EN").show(truncate=False)


+---------------+-------------+-----+-------+-------+--------------+--------+-----------------+--------+-----------+----------------------------+-------------+
|Product_Name   |Issue_Date   |Price|Brand  |Country|Product_Number|SourceId|TransactionNumber|Language|ModelNumber|StartTime                   |ProductNumber|
+---------------+-------------+-----+-------+-------+--------------+--------+-----------------+--------+-----------+----------------------------+-------------+
|Washing Machine|1648770933000|20000|Samsung|India  |0001          |150711  |123456           |EN      |456789     |2021-12-27T08:20:29.842+0000|0001         |
+---------------+-------------+-----+-------+-------+--------------+--------+-----------------+--------+-----------+----------------------------+-------------+

+-------+
|Country|
+-------+
|India  |
+-------+



In [0]:
#display(spark.sql('select * from product_details p inner join source_details s on p.Product_Number = s.ProductNumber'))

display(spark.sql('select * from product_details p FULL JOIN source_details s on p.Product_Number = s.ProductNumber'))


Product_Name,Issue_Date,Price,Brand,Country,Product_Number,SourceId,TransactionNumber,Language,ModelNumber,StartTime,ProductNumber
Washing Machine,1648770933000,20000,Samsung,India,1,150711,123456,EN,456789,2021-12-27T08:20:29.842+0000,1
Refrigerator,1648770999000,35000,LG,,2,150439,234567,UK,345678,2021-01-28T08:21:14.645+0000,2
Air Cooler,1648770948000,45000,Voltas,,3,150647,345678,ES,234567,2021-12-27T08:22:42.445+0000,3


In [0]:
display(spark.sql('select * from product_details p FULL JOIN source_details s on p.Product_Number = s.ProductNumber where s.Language = "EN" '))

display(spark.sql('select p.Country from product_details p FULL JOIN source_details s on p.Product_Number = s.ProductNumber where s.Language = "EN" '))


Product_Name,Issue_Date,Price,Brand,Country,Product_Number,SourceId,TransactionNumber,Language,ModelNumber,StartTime,ProductNumber
Washing Machine,1648770933000,20000,Samsung,India,1,150711,123456,EN,456789,2021-12-27T08:20:29.842+0000,1


Country
India
