In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row

In [2]:
spark = SparkSession.builder.appName("OPERATION-MANAGEMENT").getOrCreate()


In [3]:
from pyspark.sql.functions import *

df = spark.read.csv("../3_STORAGE_LAYER/DATALAKEHOUSE_BRONZE_LAYER/plantshift.csv", header=True, inferSchema=True)

df = df.select("Quantity", "Date", "Hour", "Bay")

# Extract the hour value from the Hour column and create a new column for it
df = df.withColumn("hour", substring(col("Hour"), 0, 2))



In [4]:
df = df.withColumnRenamed("Quantity", "Planned production")\
        .withColumnRenamed("Date", "date")\
        .withColumnRenamed("Bay", "lane values")

In [5]:
step1_df = df.select("Planned production", "date", "hour", "lane values")


In [6]:
step1_df.show()


+------------------+-------------------+----+-----------+
|Planned production|               date|hour|lane values|
+------------------+-------------------+----+-----------+
|                21|2022-06-01 00:00:00|  19|    EE-0001|
|                21|2022-06-01 00:00:00|  19|    EE-0001|
|                21|2022-06-01 00:00:00|  19|    EE-0001|
|                21|2022-06-01 00:00:00|  19|    EE-0001|
|                22|2022-06-01 00:00:00|  19|    EE-0001|
|                22|2022-06-01 00:00:00|  19|    EE-0001|
|                22|2022-06-01 00:00:00|  19|    EE-0001|
|                22|2022-06-01 00:00:00|  19|    EE-0001|
|                22|2022-06-01 00:00:00|  19|    EE-0001|
|                21|2022-06-01 00:00:00|  19|    TS-0001|
|                21|2022-06-01 00:00:00|  19|    TS-0001|
|                21|2022-06-01 00:00:00|  19|    TS-0001|
|                21|2022-06-01 00:00:00|  19|    TS-0001|
|                21|2022-06-01 00:00:00|  19|    TS-0001|
|             

In [7]:
resultdf = spark.read.csv('../3_STORAGE_LAYER/DATALAKEHOUSE_BRONZE_LAYER/result.csv', header=True, inferSchema=True)
routingdf = spark.read.csv('../3_STORAGE_LAYER/DATALAKEHOUSE_BRONZE_LAYER/routing1.csv', header=True, inferSchema=True)

common_columns = set(resultdf.columns).intersection(routingdf.columns)
common_columns = list(common_columns)
routingdf= routingdf.drop(*common_columns)
step2df = resultdf.join(routingdf, resultdf.RoutingStageId == routingdf.id, "inner")





print(step2df.columns)
print(common_columns)



num_rows = step2df.count()
num_cols = len(step2df.columns)
print(num_rows,num_cols)


['Id', 'Status', 'BoardId', 'BatchId', 'WorkOrderId', 'RoutingStageId', 'RoutingStageName', 'Operator', 'Deviation', 'InspectionDate', 'LastModifiedDate', 'ReInspectionNeeded', 'PreviouslySannedBoards', 'RoutingStatus', 'CavityID', 'SubWorkCenter', 'StationCode', 'StationName', 'TrayId', 'AssetSubNodeId', 'CollectionId', 'Company', 'Division', 'id', 'PlacementOpportunities', 'TerminationOpportunities', 'AssemblyOpportunities', 'ComponentOpportunities', 'UpperLimit', 'DisplayOrder', 'OperationNo', 'NextOperationNo', 'PreviousOperationNo', 'Name', 'EstimatedTime', 'OnlineInspection', 'ICTStage', 'ProcessCheck', 'QAPercentage', 'HasSA', 'saValidationMethod', 'Surface', 'InspectionMode', 'TrayReleasingStage', 'TimeLineStartStage', 'TimeLineActionMode', 'TimeLineActionInstance', 'AutoRejectLimit', 'ReRoutingActionStage', 'OfflineAction', 'OfflineProcessDuration', 'TimeLineDuration', 'PCBStarting', 'CleaningDelay', 'ReRoutingStage', 'AOIStage', 'IsMachineTraceabilityRequired', 'QaAfterRework

In [8]:
from pyspark.sql import functions as F

workorderdf = spark.read.csv("../3_STORAGE_LAYER/DATALAKEHOUSE_BRONZE_LAYER/workorder.csv", header=True, inferSchema=True)

workorderdf = workorderdf.withColumnRenamed("Id", "Id_x")
# find common columns
workorder_columns = workorderdf.columns
step2_columns = step2df.columns
common_columns = list(set(workorder_columns).intersection(step2_columns))
print(common_columns)
workorderdf= workorderdf.drop(*common_columns)




# join dataframes
step3df = step2df.join(workorderdf, step2df.WorkOrderId == workorderdf.Id_x, "left")
step3df=step3df.drop("Id_x")


# filter by Surface
step3df = step3df.filter(step3df["Surface"] == 1)

[]


In [9]:
num_rows = step3df.count()
num_cols = len(step3df.columns)
print(num_rows,num_cols)
step3df.show()

55149 119
+-----+------+--------------------+--------------------+-----------+--------------------+--------------------+--------+---------+-------------------+-------------------+------------------+----------------------+-------------+--------+-------------+-----------+-----------+------+--------------+------------+-------+--------+--------------------+----------------------+------------------------+---------------------+----------------------+----------+------------+-----------+---------------+-------------------+--------------------+-------------+----------------+--------+------------+------------+-----+------------------+-------+--------------+------------------+------------------+------------------+----------------------+---------------+--------------------+-------------+----------------------+----------------+-----------+-------------+--------------+--------+-----------------------------+-------------+-------------------------+-------------+---------------+----------------+-------

In [10]:
from pyspark.sql import functions as F

# Convert the "LastModifiedDate" column to timestamp type
step3df = step3df.withColumn("LastModifiedDate", F.to_timestamp("LastModifiedDate", "yyyy-MM-dd HH:mm:ss"))

# Extract hour and day from the timestamp and add them as new columns
step3df = step3df.withColumn("LastModifiedDate_Hour", F.hour("LastModifiedDate"))
step3df = step3df.withColumn("LastModifiedDate_Date", F.dayofmonth("LastModifiedDate"))

# Groupby ItemId, SubWorkCenter, LastModifiedDate_Date and LastModifiedDate_Hour and count the number of BoardId
step4df = step3df.groupBy("ItemId", "SubWorkCenter", "LastModifiedDate_Date","LastModifiedDate_Hour").agg(F.count("BoardId"))

In [11]:
num_rows = step4df.count()
num_cols = len(step4df.columns)
print(num_rows,num_cols)
step4df.show()

5987 5
+--------------------+-------------+---------------------+---------------------+--------------+
|              ItemId|SubWorkCenter|LastModifiedDate_Date|LastModifiedDate_Hour|count(BoardId)|
+--------------------+-------------+---------------------+---------------------+--------------+
|Items-EE-07197264-UK|      EE-0001|                    1|                   19|             8|
|Items-BC-03055426-CH|      BC-0001|                    1|                   11|             3|
|Items-EP-03621157-BR|      EP-0001|                    2|                   10|             4|
|Items-EE-03921377-JP|      EE-0001|                    1|                   22|             5|
|Items-TS-00689461-BR|      TS-0001|                    2|                    0|             2|
|Items-TS-01948203-UK|      TS-0001|                    1|                   20|             9|
|Items-EP-04536209-JP|      EP-0001|                    1|                   14|             6|
|Items-EP-04536209-JP|      EP-00

In [12]:
df = spark.read.csv("../3_STORAGE_LAYER/DATALAKEHOUSE_BRONZE_LAYER/plantshift.csv", header=True, inferSchema=True)
# Extract hour and day from the timestamp and add them as new columns
df = df.withColumn("Hour_Hour", hour(to_timestamp("Hour", "yyyy-MM-dd HH:mm:ss")))
df = df.withColumn("Date_Date", dayofmonth(to_timestamp("Date", "yyyy-MM-dd")))

# rename columns to match with the columns of step4df
df = df.withColumnRenamed("ItemNo", "ItemId")
df = df.withColumnRenamed("Station", "SubWorkCenter")
df = df.withColumnRenamed("Hour_Hour", "LastModifiedDate_Hour")
df = df.withColumnRenamed("Date_Date", "LastModifiedDate_Date")

# Use the join method to combine the data on the common columns
step5df = step4df.join(df, 
                      on=['ItemId', 'SubWorkCenter', 'LastModifiedDate_Hour', 'LastModifiedDate_Date'] , 
                      how='inner') 

In [13]:
num_rows = step5df.count()
num_cols = len(step5df.columns)
print(num_rows,num_cols)
step5df.show()

2369 18
+--------------------+-------------+---------------------+---------------------+--------------+---+-------------------+-----+-------------------+--------+---------------+-------+----------+-------------------+---------+-----------+-------+--------+
|              ItemId|SubWorkCenter|LastModifiedDate_Hour|LastModifiedDate_Date|count(BoardId)| ID|               Date|Shift|               Hour|Quantity|     LineLeader|    Bay|DivisionId|   CreationDateTime|BreakTime|WorkOrderId|Company|Division|
+--------------------+-------------+---------------------+---------------------+--------------+---+-------------------+-----+-------------------+--------+---------------+-------+----------+-------------------+---------+-----------+-------+--------+
|Items-SB-03591404-JP|      SB-0001|                   11|                    2|            12|513|2022-06-02 00:00:00|    G|1900-01-01 11:00:00|      23|SB_LineLeader_1|SB-0001|         3|2022-06-02 03:02:00|        0|   W-472205|   null|    nu

In [14]:
itemsdf = spark.read.csv("../3_STORAGE_LAYER/DATALAKEHOUSE_BRONZE_LAYER/items.csv", header=True, inferSchema=True)
num_rows = itemsdf.count()
num_cols = len(itemsdf.columns)
print(num_rows,num_cols)
itemsdf.show()

99 12
+----+--------------------+--------+--------+-------+----------------+--------------------+---------+--------------+----+-------------+----------+
|  ID|         Description|Modality|Revision|BaseUOM|Batch_Management|SerialNumber_Profile|ShelfLife|ShelfLife_Date| MSD|Item_Category|MSLDetails|
+----+--------------------+--------+--------+-------+----------------+--------------------+---------+--------------+----+-------------+----------+
|C530|Items-SB-06861728-JP|      SB|    None|   None|            None| 03/01/2023 35:19:32|     None|          None|None|        SB_JP|      None|
|C541|Items-EE-04270935-CH|      EE|    None|   None|            None| 03/01/2023 35:19:32|     None|          None|None|        EE_CH|      None|
|C749|Items-EP-02565413-IN|      EP|    None|   None|            None| 2023-01-03 35:19:32|     None|          None|None|        EP_IN|      None|
|C990|Items-BC-06949570-JP|      BC|    None|   None|            None| 2023-01-03 35:19:32|     None|          N

In [15]:
# find common columns
items_columns = itemsdf.columns
step5_columns = step5df.columns
common_columns = list(set(items_columns).intersection(step5_columns))
print(common_columns)
# find common columns
itemsdf= itemsdf.drop(*common_columns)

['ID']


In [16]:
step6df = step5df.join(itemsdf, step5df.ItemId == itemsdf.Description, "left")

In [17]:
step6df

DataFrame[ItemId: string, SubWorkCenter: string, LastModifiedDate_Hour: int, LastModifiedDate_Date: int, count(BoardId): bigint, ID: int, Date: timestamp, Shift: string, Hour: timestamp, Quantity: int, LineLeader: string, Bay: string, DivisionId: int, CreationDateTime: timestamp, BreakTime: int, WorkOrderId: string, Company: string, Division: string, Description: string, Modality: string, Revision: string, BaseUOM: string, Batch_Management: string, SerialNumber_Profile: string, ShelfLife: string, ShelfLife_Date: string, MSD: string, Item_Category: string, MSLDetails: string]

In [18]:
num_rows = step6df.count()
num_cols = len(step6df.columns)
print(num_rows,num_cols)
step6df.show()

2369 29
+--------------------+-------------+---------------------+---------------------+--------------+---+-------------------+-----+-------------------+--------+---------------+-------+----------+-------------------+---------+-----------+-------+--------+--------------------+--------+--------+-------+----------------+--------------------+---------+--------------+----+-------------+----------+
|              ItemId|SubWorkCenter|LastModifiedDate_Hour|LastModifiedDate_Date|count(BoardId)| ID|               Date|Shift|               Hour|Quantity|     LineLeader|    Bay|DivisionId|   CreationDateTime|BreakTime|WorkOrderId|Company|Division|         Description|Modality|Revision|BaseUOM|Batch_Management|SerialNumber_Profile|ShelfLife|ShelfLife_Date| MSD|Item_Category|MSLDetails|
+--------------------+-------------+---------------------+---------------------+--------------+---+-------------------+-----+-------------------+--------+---------------+-------+----------+-------------------+-----

In [19]:
#step6df.write.csv("../4_METADATA_LAYER/LAKEHOUSE_SILVER_LAYER/step6.csv")

In [24]:
import pandas as pd
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
step6df = step6df.toPandas()

In [27]:
step6df.to_csv("step6.csv")