In [None]:
import os

In [None]:
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType, TimestampType, DoubleType

In [None]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.0.0") \
        .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
        .config("spark.hadoop.fs.s3a.access.key", 'AKIA3AEXDSNEGXQERCGG') \
        .config("spark.hadoop.fs.s3a.secret.key", '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') \
        .appName('Report 1 : Operations Management Report')\
        .getOrCreate()

In [None]:
resultsSchema = StructType([
    StructField("Id", IntegerType(), True),
    StructField("Status", StringType(), True),
    StructField("BoardId", IntegerType(), True),
    StructField("BatchId", StringType(), True),
    StructField("WorkOrderId", StringType(), True),
    StructField("RoutingStageId", StringType(), True),
    StructField("RoutingStageName", StringType(), True),
    StructField("Operator", StringType(), True),
    StructField("Deviation", StringType(), True),
    StructField("InspectionDate", StringType(), True),
    StructField("LastModifiedDate", StringType(), True),
    StructField("ReInspectionNeeded", StringType(), True),
    StructField("PreviouslySannedBoards", StringType(), True),
    StructField("RoutingStatus", StringType(), True),
    StructField("CavityID", StringType(), True),
    StructField("SubWorkCenter", StringType(), True),
    StructField("StationCode", StringType(), True),
    StructField("StationName", StringType(), True),
    StructField("TrayId", StringType(), True),
    StructField("AssetSubNodeId", StringType(), True),
    StructField("CollectionId", StringType(), True),
    StructField("Company", StringType(), True),
    StructField("Division", StringType(), True),
 ])

In [None]:
workOrdersSchema=StructType([
    StructField("Id", StringType(), True),
    StructField("ItemId", StringType(), True),
    StructField("LineNo", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", StringType(), True),
    StructField("Started", StringType(), True),
    StructField("StartDate", StringType(), True),
    StructField("EndDate", StringType(), True),
    StructField("EcnNo", StringType(), True),
    StructField("EcnQunatity", StringType(), True),
    StructField("EcnStatus", StringType(), True),
    StructField("ProductRevision", StringType(), True),
    StructField("PlannedStartDate", StringType(), True),
    StructField("PlannedEndDate", StringType(), True),
    StructField("Isblocked", StringType(), True),
    StructField("BlockedDate", StringType(), True),
    StructField("BlockedBy", StringType(), True),
    StructField("BatchProceedStatus", StringType(), True),
    StructField("WorkOrderClosureStatus", StringType(), True),
    StructField("ShortClosedQuantity", StringType(), True),
    StructField("CreationDate", StringType(), True),
    StructField("DysonPONumber", StringType(), True),
    StructField("CustomerSKUNumber", StringType(), True),
    StructField("RoutingVersionId", StringType(), True),
    StructField("RoutingHeaderId", StringType(), True),
    StructField("ERPClosureStatus", StringType(), True),
    StructField("FeederReloadLockRequired", StringType(), True),
    StructField("MSDLockRequired", StringType(), True),
    StructField("Unit Price", StringType(), True),
    StructField("AllowCustomerRefNoRepetition", StringType(), True),
    StructField("Company", StringType(), True),
    StructField("Division", StringType(), True),
])

In [None]:
from pyspark.sql.functions import hour
from pyspark.sql.functions import date_format
from pyspark.sql.functions import countDistinct

In [None]:
plans_df = spark.read\
    .format("csv")\
    .option("header","true")\
    .option("delimiter","|")\
    .load("s3a://hackathon2023/data/OperationsManagement/PlansShiftWise/PlansShiftWise.csv")

In [None]:
results_df = spark.read\
    .format("csv")\
    .option("header","False")\
    .schema(resultsSchema)\
    .option("delimiter",",")\
    .load("s3a://hackathon2023/data/OperationsManagement/Results/Results.csv",inferSchema=True)
routing_df = spark.read\
    .parquet("s3a://hackathon2023/data/OperationsManagement/RoutingStages/RoutingStages.parquet",inferSchema=True)
combined_df = results_df\
    .join(routing_df, [results_df.RoutingStageId == routing_df.id,results_df.WorkOrderId==routing_df.WorkOrderId], "inner")\
    .drop(routing_df.WorkOrderId)

In [None]:
work_orders_df = spark.read\
    .format("csv")\
    .option("header","false")\
    .option("delimiter","\t")\
    .schema(workOrdersSchema)\
    .load("s3a://hackathon2023/data/OperationsManagement/Workorders/Workorders.csv",inferSchema=True)


In [None]:
combined_df = combined_df\
    .join(work_orders_df, combined_df.WorkOrderId == work_orders_df.Id, "left_outer")

In [None]:
combined_df = combined_df\
    .filter(combined_df.Surface == 1)

In [None]:
actual_df = combined_df.groupBy("ItemId", "SubWorkCenter", 
                                hour(combined_df.LastModifiedDate).alias("Hour"), 
                                date_format(combined_df.LastModifiedDate, "yyyy-MM-dd").alias("Date")
                               ).agg(countDistinct("BoardId").alias("ActualQuantity"))

In [None]:
combined_df = actual_df.join(plans_df, 
                             (actual_df.ItemId == plans_df.ItemNo) & 
                             (actual_df.SubWorkCenter == plans_df.Station) & 
                             (actual_df.Hour == hour(plans_df.Hour)) & 
                             (actual_df.Date == date_format(plans_df.Date, "yyyy-MM-dd")), 
                             "inner")

In [None]:
items_df=spark.read\
    .text("s3a://hackathon2023/data/OperationsManagement/Items/Items.txt")

In [None]:
from pyspark.sql.types import *
import re
from pyspark.sql.functions import udf,col,struct

In [None]:
pattern = r"C[0-9]+(.+?)UU-(.+?)nxklh2022(.+?)-.+?1(.+?)\\R\$\$(.+?)plantxi12(.+?)(?:(?:\d{2}){1,2}[/-]\d{1,2}[/-](?:\d{2}){1,2})\s\d{1,2}:\d{2}:\d{2}(.+?)k8(.+?)bHM(.+?)--(.+?)P011(.+?)MD(.+)"
schema = StructType([
    StructField("ID", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Modality", StringType(), True),
    StructField("Revision", StringType(), True),
    StructField("BaseUOM", StringType(), True),
    StructField("Batch_Management", StringType(), True),
    StructField("SerialNumber_Profile", StringType(), True),
    StructField("ShelfLife", StringType(), True),
    StructField("ShelfLife_Date", StringType(), True),
    StructField("MSD", StringType(), True),
    StructField("Item_Category", StringType(), True),
    StructField("MSLDetails", StringType(), True)
])

In [None]:
def extract_values(s):
    m = re.match(pattern, s)
    if m:
        return tuple(m.groups())
    else:
        return None
extract_values_udf = udf(extract_values, schema)
items_df = items_df.withColumn("structured_data", extract_values_udf(items_df.value))
items_df = items_df.withColumn("ID", col("structured_data.ID"))
items_df = items_df.withColumn("Description", col("structured_data.Description"))
items_df = items_df.withColumn("Modality", col("structured_data.Modality"))
items_df = items_df.withColumn("Revision", col("structured_data.Revision"))
items_df = items_df.withColumn("BaseUOM", col("structured_data.BaseUOM"))
items_df = items_df.withColumn("Batch_Management", col("structured_data.Batch_Management"))
items_df = items_df.withColumn("SerialNumber_Profile", col("structured_data.SerialNumber_Profile"))
items_df = items_df.withColumn("ShelfLife", col("structured_data.ShelfLife"))
items_df = items_df.withColumn("ShelfLife_Date", col("structured_data.ShelfLife_Date"))
items_df = items_df.withColumn("MSD", col("structured_data.MSD"))
items_df = items_df.withColumn("Item_Category", col("structured_data.Item_Category"))
items_df = items_df.withColumn("MSLDetails", col("structured_data.MSLDetails"))
items_df = items_df.withColumn("MSLDetails", col("structured_data.MSLDetails"))
items_df=items_df.drop("value")
items_df=items_df.drop("structured_data")

In [None]:
final_df = combined_df.join(items_df, combined_df.ItemId == items_df.ID, "left_outer")

In [None]:
final_df.show()