In [0]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from datetime import datetime
from pyspark.sql.types import *

In [0]:
# How to read csv file and infer the schema
sc

In [0]:
spark = SparkSession.builder.appName("Task_1").getOrCreate()
data = spark.read.format("csv").option("header","true").option("delimiter",";").option("inferSchema","true").load("dbfs:/FileStore/tables/assets.csv")

In [0]:
data.printSchema()

root
 |-- description: string (nullable = true)
 |-- parent_func_loc: string (nullable = true)
 |-- WMT_LOCATION_ID: integer (nullable = true)
 |-- WMT_TAG_HISTORYREQUIRED: string (nullable = true)
 |-- WMT_TAG_DESC: string (nullable = true)
 |-- WMT_TAG_NAME: string (nullable = true)
 |-- WMT_CONTRACTOR_ID: integer (nullable = true)
 |-- WMT_PO_ID: integer (nullable = true)
 |-- WMT_AREA_ID: integer (nullable = true)
 |-- WMT_TAG_ID_ANCESTOR: integer (nullable = true)
 |-- loc: string (nullable = true)
 |-- WMT_TAG_ID: integer (nullable = true)



In [0]:
data.show(4)

+--------------------+--------------------+---------------+-----------------------+--------------------+------------+-----------------+---------+-----------+-------------------+-----------+----------+
|         description|     parent_func_loc|WMT_LOCATION_ID|WMT_TAG_HISTORYREQUIRED|        WMT_TAG_DESC|WMT_TAG_NAME|WMT_CONTRACTOR_ID|WMT_PO_ID|WMT_AREA_ID|WMT_TAG_ID_ANCESTOR|        loc|WMT_TAG_ID|
+--------------------+--------------------+---------------+-----------------------+--------------------+------------+-----------------+---------+-----------+-------------------+-----------+----------+
|   Valhall plattform|             Aker BP|           null|                   null|   Valhall plattform|         VAL|             null|     null|       null|               null|        VAL|         0|
|VRD - PH 1STSTGCO...|23-1ST STAGE COMP...|           1004|                      Y|VRD - PH 1STSTGCO...| 23-TT-92533|             1686|     8309|       1600|             681824|23-TT-92533|      4

In [0]:
# How to write the csv data to delta format 
data.write.format("delta").saveAsTable("delta_table3") 

In [0]:
# How to read contents of the delta table
spark.table("delta_table3").show(5)

+--------------------+--------------------+---------------+-----------------------+--------------------+------------+-----------------+---------+-----------+-------------------+-----------+----------+
|         description|     parent_func_loc|WMT_LOCATION_ID|WMT_TAG_HISTORYREQUIRED|        WMT_TAG_DESC|WMT_TAG_NAME|WMT_CONTRACTOR_ID|WMT_PO_ID|WMT_AREA_ID|WMT_TAG_ID_ANCESTOR|        loc|WMT_TAG_ID|
+--------------------+--------------------+---------------+-----------------------+--------------------+------------+-----------------+---------+-----------+-------------------+-----------+----------+
|   Valhall plattform|             Aker BP|           null|                   null|   Valhall plattform|         VAL|             null|     null|       null|               null|        VAL|         0|
|VRD - PH 1STSTGCO...|23-1ST STAGE COMP...|           1004|                      Y|VRD - PH 1STSTGCO...| 23-TT-92533|             1686|     8309|       1600|             681824|23-TT-92533|      4

In [0]:
# Create a table and load csv file dropping duplicates, append those records in existing table
data1=spark.read.format("csv").option("inferSchema","true").option("header","true").load("dbfs:/FileStore/events2.csv")
data1.count()
data1.dropDuplicates().count()
data1.show(2)

+--------------------+--------------------+----------+-------------+---------------------+-----------------------+-----------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+--------------------------------+--------------------------------+---------------------------------+---------------------------------+---------------------------------+-----------------------------------+------------------------------------+------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+---------------------------------------+---------------------------------------+---------------------------------------+---------------------------------------+----------------------------

In [0]:
# Create one table and uploading another csv file by dropping duplicates
data1.write.format("delta").option("mergeSchema","true").mode("append").saveAsTable("delta_table3")

In [0]:
# How to read contents of the updated delta table
spark.table("delta_table3").show(5)

+--------------------+--------------------+---------------+-----------------------+--------------------+------------+-----------------+---------+-----------+-------------------+-----------+----------+---------------+-------------+----------+-------------+---------------------+-----------------------+-----------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+--------------------------------+--------------------------------+---------------------------------+---------------------------------+---------------------------------+-----------------------------------+------------------------------------+------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+-

In [0]:
spark = SparkSession(sc)
sqlContext = SQLContext(sc)



In [0]:
# Select distinct output state as per latest created date
data2 = spark.read.format("csv").option("header","true").option("inferSchema","true").option("delimiter","|").load("dbfs:/FileStore/Input_Data-1.txt")
data2.show()

+--------------------+-------------------+-------------+-------------+---------------------+---------------------+------------------------------------+---------------------------+--------------------+-------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------+---------------------------+---------------------------+--------------------+----------------+
|           PONumber |RequisitionActionID|outputstateid|outputqueueid|outputstate          |outputqueue          |POStatus                            |CreatedDate                |        Duration_sec|Duration_mins|Duration_hours|Description                                                                                                                                                

In [0]:
data2.createOrReplaceTempView('transactions')

In [0]:
distinctOutput = sqlContext.sql('SELECT * from transactions').show()

+--------------------+-------------------+-------------+-------------+---------------------+---------------------+------------------------------------+---------------------------+--------------------+-------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------+---------------------------+---------------------------+--------------------+----------------+
|           PONumber |RequisitionActionID|outputstateid|outputqueueid|outputstate          |outputqueue          |POStatus                            |CreatedDate                |        Duration_sec|Duration_mins|Duration_hours|Description                                                                                                                                                

In [0]:
# prepare_data for SparkSQL Analysis manually
data3 = [("100523887","23","6","1","Completed","RequestorProjectOwner","Completed","2023-01-05 03:41:53.903","0","0.0","0.0","What:Participation in event: 1) 2-day coffee sponsorship (for 800 people) 2) Branded reusable cups to take home 3) Platinum Partnership When: June FY23 Where: O2 Universum. Českomoravská 2345/17a, 190 00, Praha 9 Who: Machine Learning Prague s. r. o. (0003024163)","2023-06-01 00:00:00.000","2023-06-02 00:00:00.000","2023-01-05 03:41:53.910","NewPurchaseOrder"),
        ("100523887","20","12","8","SAPUploaded","Procurement","SAPUploaded","2023-01-05 03:41:53.860","4","0.0","0.0","What:Participation in event: 1) 2-day coffee sponsorship (for 800 people) 2) Branded reusable cups to take home 3) Platinum Partnership When: June FY23 Where: O2 Universum. Českomoravská 2345/17a, 190 00, Praha 9 Who: Machine Learning Prague s. r. o. (0003024163)","2023-06-01 00:00:00.000","2023-06-02 00:00:00.000","2023-01-05 03:41:53.910","NewPurchaseOrder"),
        ("100523887","19","11","8","SAPApprovalCompleted","Procurement","PO Approval(Safe Approval Completed)","2023-01-05 03:41:49.657","1","0.0","0.0","What:Participation in event: 1) 2-day coffee sponsorship (for 800 people) 2) Branded reusable cups to take home 3) Platinum Partnership When: June FY23 Where: O2 Universum. Českomoravská 2345/17a, 190 00, Praha 9 Who: Machine Learning Prague s. r. o. (0003024163)","2023-06-01 00:00:00.000","2023-06-02 00:00:00.000","2023-01-05 03:41:53.910","NewPurchaseOrder"),
        ("100523887","18","10","8","SafeApproved","Procurement","PO Approval(Safe Approved)","2023-01-05 03:41:48.570","1890166","31503.0","525.0","What:Participation in event: 1) 2-day coffee sponsorship (for 800 people) 2) Branded reusable cups to take home 3) Platinum Partnership When: June FY23 Where: O2 Universum. Českomoravská 2345/17a, 190 00, Praha 9 Who: Machine Learning Prague s. r. o. (0003024163)","2023-06-01 00:00:00.000","2023-06-02 00:00:00.000","2023-01-05 03:41:53.910","NewPurchaseOrder"),
        ("100523887","2","14","18","Submitted","Procurement","Interim Submitted","2022-12-14 06:39:02.390","22","0.0","0.0","What:Participation in event: 1) 2-day coffee sponsorship (for 800 people) 2) Branded reusable cups to take home 3) Platinum Partnership When: June FY23 Where: O2 Universum. Českomoravská 2345/17a, 190 00, Praha 9 Who: Machine Learning Prague s. r. o. (0003024163)","2023-06-01 00:00:00.000","2023-06-02 00:00:00.000","2023-01-05 03:41:53.910","NewPurchaseOrder"),
        ("100523887","3","25","6","Approved","LegalContractApprover","CELA Contract Approver","2022-12-14 06:38:40.577","782003","13033.0","217.0","What:Participation in event: 1) 2-day coffee sponsorship (for 800 people) 2) Branded reusable cups to take home 3) Platinum Partnership When: June FY23 Where: O2 Universum. Českomoravská 2345/17a, 190 00, Praha 9 Who: Machine Learning Prague s. r. o. (0003024163)","2023-06-01 00:00:00.000","2023-06-02 00:00:00.000","2023-01-05 03:41:53.910","NewPurchaseOrder"),
        ("100523887","7","3","5","Assigned","UnitedStatesBuyCenter","BuyCenter Operations","2022-12-05 05:25:17.870","8289","138.0","2.0","What:Participation in event: 1) 2-day coffee sponsorship (for 800 people) 2) Branded reusable cups to take home 3) Platinum Partnership When: June FY23 Where: O2 Universum. Českomoravská 2345/17a, 190 00, Praha 9 Who: Machine Learning Prague s. r. o. (0003024163)","2023-06-01 00:00:00.000","2023-06-02 00:00:00.000","2023-01-05 03:41:53.910","NewPurchaseOrder"),
        ("100523887","2","3","12","Assigned","SystemProcessing","Submitted(Assigned)","2022-12-05 03:07:08.087","1229","20.0","0.0","What:Participation in event: 1) 2-day coffee sponsorship (for 800 people) 2) Branded reusable cups to take home 3) Platinum Partnership When: June FY23 Where: O2 Universum. Českomoravská 2345/17a, 190 00, Praha 9 Who: Machine Learning Prague s. r. o. (0003024163)","2023-06-01 00:00:00.000","2023-06-02 00:00:00.000","2023-01-05 03:41:53.910","NewPurchaseOrder"),
        ("100523887","1","13","1","Restarted","RequestorProjectOwner","null","2022-12-05 02:46:39.553","337934","5632.0","94.0","What:Participation in event: 1) 2-day coffee sponsorship (for 800 people) 2) Branded reusable cups to take home 3) Platinum Partnership When: June FY23 Where: O2 Universum. Českomoravská 2345/17a, 190 00, Praha 9 Who: Machine Learning Prague s. r. o. (0003024163)","2023-06-01 00:00:00.0000000","2023-06-02 00:00:00.000","2023-01-05 03:41:53.910","NewPurchaseOrder"),
        ("100523887","2","3","12","Assigned","SystemProcessing","Submitted(Assigned)","2022-12-01 04:54:25.253","null","null","null","What:Participation in event: 1) 2-day coffee sponsorship (for 800 people) 2) Branded reusable cups to take home 3) Platinum Partnership When: June FY23 Where: O2 Universum. Českomoravská 2345/17a, 190 00, Praha 9 Who: Machine Learning Prague s. r. o. (0003024163)","2023-06-01 00:00:00.0000000","2023-06-02 00:00:00.000","2023-01-05 03:41:53.910","NewPurchaseOrder")]

columns = ["PONumber","RequisitionActionID","outputstateid","outputqueueid","outputstate","outputqueue","POStatus","CreatedDate","Duration_sec","Duration_mins","Duration_hours","Description","StartDate","EndDate","ModifiedDate","RequisitionType"]

data3_df = spark.createDataFrame(data = data3, schema = columns)
data3_df.show()

+---------+-------------------+-------------+-------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+--------------+--------------------+--------------------+--------------------+--------------------+----------------+
| PONumber|RequisitionActionID|outputstateid|outputqueueid|         outputstate|         outputqueue|            POStatus|         CreatedDate|Duration_sec|Duration_mins|Duration_hours|         Description|           StartDate|             EndDate|        ModifiedDate| RequisitionType|
+---------+-------------------+-------------+-------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+--------------+--------------------+--------------------+--------------------+--------------------+----------------+
|100523887|                 23|            6|            1|           Completed|RequestorProjectO...|           Completed|2023-01-05 03:41:

In [0]:
data3_df.printSchema()

root
 |-- PONumber: string (nullable = true)
 |-- RequisitionActionID: string (nullable = true)
 |-- outputstateid: string (nullable = true)
 |-- outputqueueid: string (nullable = true)
 |-- outputstate: string (nullable = true)
 |-- outputqueue: string (nullable = true)
 |-- POStatus: string (nullable = true)
 |-- CreatedDate: string (nullable = true)
 |-- Duration_sec: string (nullable = true)
 |-- Duration_mins: string (nullable = true)
 |-- Duration_hours: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- StartDate: string (nullable = true)
 |-- EndDate: string (nullable = true)
 |-- ModifiedDate: string (nullable = true)
 |-- RequisitionType: string (nullable = true)



In [0]:
data3_df.createOrReplaceTempView('transact1')

In [0]:
sqlContext.sql('SELECT * from transact1').show()

+---------+-------------------+-------------+-------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+--------------+--------------------+--------------------+--------------------+--------------------+----------------+
| PONumber|RequisitionActionID|outputstateid|outputqueueid|         outputstate|         outputqueue|            POStatus|         CreatedDate|Duration_sec|Duration_mins|Duration_hours|         Description|           StartDate|             EndDate|        ModifiedDate| RequisitionType|
+---------+-------------------+-------------+-------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+--------------+--------------------+--------------------+--------------------+--------------------+----------------+
|100523887|                 23|            6|            1|           Completed|RequestorProjectO...|           Completed|2023-01-05 03:41:

In [0]:
sqlContext.sql('select PONumber from transact1').show()

+---------+
| PONumber|
+---------+
|100523887|
|100523887|
|100523887|
|100523887|
|100523887|
|100523887|
|100523887|
|100523887|
|100523887|
|100523887|
+---------+



In [0]:
# Select distinct output state as per latest creation date
distinct_opstate = sqlContext.sql('select distinct outputstate,CreatedDate desc from transact1').show()

+--------------------+--------------------+
|         outputstate|                desc|
+--------------------+--------------------+
|           Completed|2023-01-05 03:41:...|
|         SAPUploaded|2023-01-05 03:41:...|
|SAPApprovalCompleted|2023-01-05 03:41:...|
|           Submitted|2022-12-14 06:39:...|
|        SafeApproved|2023-01-05 03:41:...|
|            Approved|2022-12-14 06:38:...|
|            Assigned|2022-12-05 05:25:...|
|            Assigned|2022-12-05 03:07:...|
|            Assigned|2022-12-01 04:54:...|
|           Restarted|2022-12-05 02:46:...|
+--------------------+--------------------+



In [0]:
# Least salary from department using Spark SQL 
sc.setLogLevel("ERROR")
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
from pyspark.sql.types import *

employees_Salary = [("James","Sales","22000"),
("sofy", "Sales", "36000"),
("Laren", "Sales", "84000"),
("Kiku", "Sales", "55000"),
("Sam", "Finance", "60000"),
("Samuel", "Finance", "70000"),
("Yash", "Finance","20000"),
("Rabin", "Finance", "10000"),
("Lukasz", "Marketing", "1000"),
("Jolly", "Marketing", "11000"),
("Mausam", "Marketing", "12000"),
("Lamba", "Marketing", "13000"),
("Jogesh", "HR", "14000"),
("Mannu", "HR", "15000"),
("Sylvia", "HR", "16000"),
("Sama", "HR", "17000"),
]
columns = ["employee_name","dept_name","salary"]
employeesDF = spark.createDataFrame(data = employees_Salary, schema = columns)
employeesDF.show()

+-------------+---------+------+
|employee_name|dept_name|salary|
+-------------+---------+------+
|        James|    Sales| 22000|
|         sofy|    Sales| 36000|
|        Laren|    Sales| 84000|
|         Kiku|    Sales| 55000|
|          Sam|  Finance| 60000|
|       Samuel|  Finance| 70000|
|         Yash|  Finance| 20000|
|        Rabin|  Finance| 10000|
|       Lukasz|Marketing|  1000|
|        Jolly|Marketing| 11000|
|       Mausam|Marketing| 12000|
|        Lamba|Marketing| 13000|
|       Jogesh|       HR| 14000|
|        Mannu|       HR| 15000|
|       Sylvia|       HR| 16000|
|         Sama|       HR| 17000|
+-------------+---------+------+



In [0]:
windowPartition = Window.partitionBy("dept_name").orderBy("salary")
employeeDF = employeesDF.withColumn("rank",rank().over(windowPartition)).filter("rank=1").show()

+-------------+---------+------+----+
|employee_name|dept_name|salary|rank|
+-------------+---------+------+----+
|        Rabin|  Finance| 10000|   1|
|       Jogesh|       HR| 14000|   1|
|       Lukasz|Marketing|  1000|   1|
|        James|    Sales| 22000|   1|
+-------------+---------+------+----+



In [0]:
from pyspark.sql.functions import to_date,date_format,to_timestamp,expr
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import col,round
from pyspark.sql.types import *
Durationhoursalc_hours1 = spark.sql("select *, case when outputqueue='Procurement' then concat(outputqueue,outputstate) else outputqueue end as outputqueuelatest from transact1")
Durationhoursalc_hours1.withColumn("last_queue_exit",expr("lag(outputqueuelatest) over (partition by RequisitionActionID order by CreatedDate desc)")).createOrReplaceTempView("df4")
Durationcal=spark.sql("select * from df4 where (last_queue_exit != outputqueuelatest OR last_queue_exit is null)")
df2=Durationcal.withColumn("next_date_exit",expr("lead(CreatedDate) over (partition by RequisitionActionID order by CreatedDate desc)"))
#df2.withColumn("Duration_sec",unix_timestamp(date_format(col("CreatedDate"),"yyyy-MM-dd HH:mm:ss.SSS"),"yyyy-MM-dd HH:mm:ss.SSS")).withColumn("Duration_mins",round(col("Duration_sec")/60)).withColumn("Duration_hours",round(col("Duration_sec")/3600))
df2.show()

+---------+-------------------+-------------+-------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+--------------+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------------+
| PONumber|RequisitionActionID|outputstateid|outputqueueid|         outputstate|         outputqueue|            POStatus|         CreatedDate|Duration_sec|Duration_mins|Duration_hours|         Description|           StartDate|             EndDate|        ModifiedDate| RequisitionType|   outputqueuelatest|     last_queue_exit|      next_date_exit|
+---------+-------------------+-------------+-------------+--------------------+--------------------+--------------------+--------------------+------------+-------------+--------------+--------------------+--------------------+--------------------+--------------------+----------------+--------------

In [0]:
year = int(input("Input a year: "))

if (year % 400 == 0):
    leap_year = True
elif (year % 100 == 0):
    leap_year = False
elif (year % 4 == 0):
    leap_year = True
else:
    leap_year = False

month = int(input("Input a month [1-12]: "))

if month in (1, 3, 5, 7, 8, 10, 12):
    month_length = 31
elif month == 2:
    if leap_year:
        month_length = 29
    else:
        month_length = 28
else:
    month_length = 30


day = int(input("Input a day [1-31]: "))

if day < month_length:
    day -= 1
else:
    day = 1
    if month == 12:
        month = 1
        year += 1
    else:
        month += 1
print("The previous date is [yyyy-mm-dd] %d-%d-%d." % (year, month, day))

Input a year:  2023

Input a month [1-12]:  04

Input a day [1-31]:  13

The previous date is [yyyy-mm-dd] 2023-4-12.
