## Import necessary modules 

In [11]:
import json
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, sum,col, trim
from pyspark.sql.window import Window 

## Initialize SparkSession

In [12]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Integration") \
    .config("spark.driver.extraClassPath", "/home/jovyan/drivers/mssql-jdbc-11.2.0.jre8.jar") \
    .getOrCreate()

## Database connection properties for source and target

In [14]:
# Read configuration from config file
with open('config.json', 'r') as config_file:
    config = json.load(config_file)

# Set the environment (e.g., 'development' or 'production')
environment = 'development'  

# Extract connection properties based on the environment
source_connection_properties = config[environment]['source_connection_properties']
target_connection_properties = config[environment]['target_connection_properties']


## PODetail Integration
* __Custom queries for PODetail's source and target tables__
* __Load data from source and target PODetail's tables__
* __Filter data__
* __Apply necessary transformations__
* __Write filtered data to target table__

In [26]:
# Custom queries for source and target tables
pod_source_query = """
    (
    SELECT OrId, PO, StyleID, BuyMonth, ColorCode, ColorDesc,128 AS CreatedBy  
    FROM [NCPMS].[dbo].[CutOrdSheetDtl]
    GROUP BY OrId, PO, StyleID, BuyMonth, ColorCode, ColorDesc
    ) AS temp
"""

pod_target_query = """
    (
    SELECT DISTINCT OrId,PO,ColorCode FROM [Packing].[PODetail] 
    ) AS temp
"""


# Load data from source and target tables
pod_source_df = spark.read.jdbc(url=source_connection_properties['url'],
                            table=pod_source_query,
                            properties=source_connection_properties)

pod_target_df = spark.read.jdbc(url=target_connection_properties['url'],
                            table=pod_target_query,
                            properties=target_connection_properties)


# Filter records from source_df that do not exist in target table
pod_filtered_df = pod_source_df.join(pod_target_df, 
                                     (pod_source_df["OrId"] == pod_target_df["OrId"]) &
                                     (pod_source_df["PO"] == pod_target_df["PO"]) &
                                     (pod_source_df["ColorCode"] == pod_target_df["ColorCode"]),
                                     "left_anti").orderBy("OrId", "PO", "ColorCode")

# Apply necessary transformations to dataframe


# Get number of records
print(f"Number of records in dataframe :  {pod_filtered_df.count()}")

# # Limit number of records
# pod_filtered_df=pod_filtered_df.limit(10)


# Check if transformed_df is not empty
if not pod_filtered_df.isEmpty():
    # Insert grouped records into target table
    pod_filtered_df.write.jdbc(url=target_connection_properties['url'],
                              table="[Packing].[PODetail]",
                              mode="append",
                              properties=target_connection_properties)
else:
    print("No data to write to target table.")




Number of records in dataframe :  0
No data to write to target table.


## UpcMappingDetail Integration
* __Custom queries for UpcMappingDetail's source and target tables__
* __Load data from source and target UpcMappingDetail's tables__
* __Filter data__
* __Apply necessary transformations__
* __Write filtered data to target table__

In [25]:
# Custom queries for source and target tables
upc_source_query = """
    (
    SELECT po.PODetailID,cos.ItemSize,SUM(cos.OrderQty) as ReceivedQty,cos.DetailUPC,SUM(cos.OrderQty) as OrderQty,
    MAX(cos.DescriptionDtl) as DescriptionDtl,MAX(cos.HTS) as HTS,128 AS CreatedBy
    FROM [NCPMS].[dbo].[CutOrdSheetDtl] AS cos
    JOIN [ActiveSooperWizerNCL].[Packing].[PODetail] AS po
    ON cos.OrId = po.OrId AND cos.PO = po.PO AND cos.ColorCode = po.ColorCode
    GROUP BY po.PODetailID,cos.ItemSize,cos.DetailUPC
    ) AS temp
"""

upc_target_query = """
    (
    SELECT DISTINCT PODetailID,ItemSize,DetailUPC FROM [Packing].[UpcMappingDetail] 
    ) AS temp
"""


# Load data from source and target tables
upc_source_df = spark.read.jdbc(url=source_connection_properties['url'],
                            table=upc_source_query,
                            properties=source_connection_properties)

upc_target_df = spark.read.jdbc(url=target_connection_properties['url'],
                            table=upc_target_query,
                            properties=target_connection_properties)


# Filter records from source_df that do not exist in target table
upc_filtered_df = upc_source_df.join(upc_target_df, 
                                     (upc_source_df["PODetailID"] == upc_target_df["PODetailID"]) &
                                     (upc_source_df["ItemSize"] == upc_target_df["ItemSize"]) &
                                     (upc_source_df["DetailUPC"] == upc_target_df["DetailUPC"]),
                                     "left_anti").orderBy("PODetailID", "ItemSize")

# Apply necessary transformations to dataframe


# Get number of records
print(f"Number of records in dataframe :  {upc_filtered_df.count()}")

# # Limit number of records
# upc_filtered_df=upc_filtered_df.limit(10)


# Check if transformed_df is not empty
if not upc_filtered_df.isEmpty():
    # Insert grouped records into target table
    upc_filtered_df.write.jdbc(url=target_connection_properties['url'],
                              table="[Packing].[UpcMappingDetail]",
                              mode="append",
                              properties=target_connection_properties)
else:
    print("No data to write to target table.")




Number of records in dataframe :  0
No data to write to target table.


## ShipDetail Integration
* __Custom queries for ShipDetail's source and target tables__
* __Load data from source and target ShipDetail's tables__
* __Filter data__
* __Apply necessary transformations__
* __Write filtered data to target table__

In [30]:
# Custom queries for source and target tables
shipdtl_source_query = """
    (
    SELECT po.PODetailID, cos.ExFactoryDate,cos.ShipTo,cos.TransportMethod,cos.Col2 AS StoreDetail,cos.ShipType,cos.PackingDtl,128 AS CreatedBy
    FROM [NCPMS].[dbo].[CutOrdSheetDtl] AS cos
    JOIN [ActiveSooperWizerNCL].[Packing].[PODetail] AS po ON cos.OrId = po.OrId AND cos.PO = po.PO AND cos.ColorCode = po.ColorCode
    GROUP BY po.PODetailID, cos.ExFactoryDate, cos.ShipTo, cos.TransportMethod, cos.Col2,cos.ShipType,cos.PackingDtl
    ) AS temp
"""

shipdtl_target_query = """
    (
    SELECT DISTINCT PODetailID,ExFactoryDate,ShipTo,TransportMethod,StoreDetail,ShipType,PackingDtl FROM [Packing].[ShipDetail] 
    ) AS temp
"""


# Load data from source and target tables
shipdtl_source_df = spark.read.jdbc(url=source_connection_properties['url'],
                            table=shipdtl_source_query,
                            properties=source_connection_properties)

shipdtl_target_df = spark.read.jdbc(url=target_connection_properties['url'],
                            table=shipdtl_target_query,
                            properties=target_connection_properties)


# Filter records from source_df that do not exist in target table
shipdtl_filtered_df = shipdtl_source_df.join(shipdtl_target_df, 
                                        (shipdtl_source_df["PODetailID"] == shipdtl_target_df["PODetailID"]) &
                                        (shipdtl_source_df["ExFactoryDate"] == shipdtl_target_df["ExFactoryDate"]) &
                                        (shipdtl_source_df["ShipTo"] == shipdtl_target_df["ShipTo"]) &
                                        (shipdtl_source_df["TransportMethod"] == shipdtl_target_df["TransportMethod"]) &
                                        (shipdtl_source_df["StoreDetail"] == shipdtl_target_df["StoreDetail"]) & 
                                        (shipdtl_source_df["ShipType"] == shipdtl_target_df["ShipType"]) &   
                                        (shipdtl_source_df["PackingDtl"] == shipdtl_target_df["PackingDtl"]),
                                        "left_anti").orderBy("PODetailID")

# Apply necessary transformations to dataframe


# Get number of records
print(f"Number of records in dataframe :  {shipdtl_filtered_df.count()}")

# # Limit number of records
# shipdtl_filtered_df=shipdtl_filtered_df.limit(10)


# Check if transformed_df is not empty
if not shipdtl_filtered_df.isEmpty():
    # Insert grouped records into target table
    shipdtl_filtered_df.write.jdbc(url=target_connection_properties['url'],
                              table="[Packing].[ShipDetail]",
                              mode="append",
                              properties=target_connection_properties)
else:
    print("No data to write to target table.")




Number of records in dataframe :  0
No data to write to target table.


## PackAndShipDetail Integration
* __Custom queries for PackAndShipDetail's source and target tables__
* __Load data from source and target PackAndShipDetail's tables__
* __Filter data__
* __Apply necessary transformations__
* __Write filtered data to target table__

In [34]:

# Custom queries for source and target tables
PackAndshipdtl_source_query = """
    (
    SELECT upc.UpcMappingDetailID, shd.ShipDetailID, cos.Kit, 1 AS Locked, cos.PackRatio, cos.PackType, SUM(cos.OrderQty) AS OrderQty,
    SUM(cos.UDT) AS UDT, SUM(cos.ODT) AS ODT, 'N/A' AS Ext ,Max(cos.CLength) AS CLength ,Max(cos.CWidth) AS CWidth ,Max(cos.CHeight) AS CHeight,128 AS CreatedBy
    FROM [NCPMS].[dbo].[CutOrdSheetDtl] AS cos
    JOIN [ActiveSooperWizerNCL].[Packing].[PODetail] AS po ON cos.OrId = po.OrId AND cos.PO = po.PO And cos.ColorCode = po.ColorCode
    JOIN [ActiveSooperWizerNCL].[Packing].[UpcMappingDetail] AS upc ON po.PODetailID = upc.PODetailID AND cos.ItemSize = upc.ItemSize AND cos.DetailUPC = upc.DetailUPC 
    JOIN [ActiveSooperWizerNCL].[Packing].[ShipDetail] AS shd ON cos.ExFactoryDate = shd.ExFactoryDate AND cos.ShipTo = shd.ShipTo AND cos.TransportMethod = shd.TransportMethod
    AND cos.Col2 = shd.StoreDetail AND cos.ShipType = shd.ShipType AND cos.PackingDtl = shd.PackingDtl AND po.PODetailID = shd.PODetailID
    GROUP BY upc.UpcMappingDetailID,shd.ShipDetailID,cos.Kit,cos.PackRatio,cos.PackType,cos.ExFactoryDate,cos.ShipTo,
    cos.TransportMethod,cos.Col2,cos.ShipType,cos.PackingDtl
    ) AS temp
"""

PackAndshipdtl_target_query = """
    (
    SELECT DISTINCT UpcMappingDetailID,ShipDetailID,Kit,PackRatio,PackType FROM [Packing].[PackAndShipDetail] 
    ) AS temp
"""


# Load data from source and target tables
PackAndshipdtl_source_df = spark.read.jdbc(url=source_connection_properties['url'],
                            table=PackAndshipdtl_source_query,
                            properties=source_connection_properties)

PackAndshipdtl_target_df = spark.read.jdbc(url=target_connection_properties['url'],
                            table=PackAndshipdtl_target_query,
                            properties=target_connection_properties)


# Filter records from source_df that do not exist in target table
PackAndshipdtl_filtered_df = PackAndshipdtl_source_df.join(PackAndshipdtl_target_df, 
                                        (PackAndshipdtl_source_df["UpcMappingDetailID"] == PackAndshipdtl_target_df["UpcMappingDetailID"]) &
                                        (PackAndshipdtl_source_df["ShipDetailID"] == PackAndshipdtl_target_df["ShipDetailID"]) &
                                        (PackAndshipdtl_source_df["Kit"] == PackAndshipdtl_target_df["Kit"]) &
                                        (PackAndshipdtl_source_df["PackRatio"] == PackAndshipdtl_target_df["PackRatio"]) &
                                        (PackAndshipdtl_source_df["PackType"] == PackAndshipdtl_target_df["PackType"]),
                                        "left_anti").orderBy("ShipDetailID","UpcMappingDetailID")

# Apply necessary transformations to dataframe


# Get number of records
print(f"Number of records in dataframe :  {PackAndshipdtl_filtered_df.count()}")

# # Limit number of records
# PackAndshipdtl_filtered_df=PackAndshipdtl_filtered_df.limit(10)


# Check if transformed_df is not empty
if not PackAndshipdtl_filtered_df.isEmpty():
    # Insert grouped records into target table
    PackAndshipdtl_filtered_df.write.jdbc(url=target_connection_properties['url'],
                              table="[Packing].[PackAndShipDetail]",
                              mode="append",
                              properties=target_connection_properties)
else:
    print("No data to write to target table.")




Number of records in dataframe :  0
No data to write to target table.


## For Testing Connection

In [35]:
# # Define database connection properties for source and target
# source_connection_properties = {
#     "url": "jdbc:sqlserver://10.0.0.9:1435;databaseName=NCPMS;encrypt=false",
#     "user": "sa",
#     "password": "spts@3311",
#     "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
# }

# target_connection_properties = {
#     "url": "jdbc:sqlserver://10.0.0.9:1435;databaseName=NCLSooperWizerQA;encrypt=false",
#     "user": "sa",
#     "password": "spts@3311",
#     "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
# }

# # Custom queries for source and target tables
# order_source_query = """
#     (
#     SELECT OrId AS SaleOrderCode, Buyer AS Customer, CompanyId, BuyMonth, StyleNo,OrId 
#     FROM [dbo].[uniqueBundleWise_vw] 
#     WHERE CompanyId IN ('CW', 'NCL', 'PCI', 'FIN')
#     ) AS temp
# """

# order_target_query = """
#     (
#     SELECT DISTINCT SaleOrderCode 
#     FROM [Essentials].[SaleOrder]
#     ) AS temp
# """


# # Load data from source and target tables
# order_source_df = spark.read.jdbc(url=source_connection_properties['url'],
#                             table=order_source_query,
#                             properties=source_connection_properties)

# order_target_df = spark.read.jdbc(url=target_connection_properties['url'],
#                             table=order_target_query,
#                             properties=target_connection_properties)

# order_source_df.show()


## Stop SparkSession

In [8]:
# Stop SparkSession
spark.stop()