In [2]:
#input_path ="C:\\Users\\arpit\\Downloads\\dyson\\input_files"

In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp
from pyspark.sql.functions import *

import yaml

In [40]:
# Function to read the configuration from config.yaml
def read_config():
    try:
        with open('config.yaml', 'r') as config_file:
            config = yaml.safe_load(config_file)
        return config
    except FileNotFoundError:
        print("Error: config.yaml file not found.")
        return None
    except yaml.YAMLError as e:
        print(f"Error loading config.yaml: {e}")
        return None

if __name__ == "__main__":
    # Read the configuration from config.yaml
    config = read_config()

    if config:
        # Access the input_path from the config
        input_path = config.get('input_path')
        output_path = config.get('output_path')

        # Print the input_path (you can use it in your PySpark pipeline)
        print(f"Input Path: {input_path}")
        print(f"Output Path: {output_path}")
    else:
        print("Configuration not loaded. Please check the YAML file and its location.")

    
    # Create a Spark session with the configured app name
    spark = SparkSession.builder.appName(config["spark"]["app_name"]).getOrCreate()



Input Path: /home/jovyan/work/digital/source
Output Path: /home/jovyan/work/digital/target


In [41]:
# Read the CSV file into a DataFrame
df_char = spark.read.csv(f"{input_path}/characters.csv", header=True, inferSchema=True)
df_char.show(5)

+-----------+---------------+
|characterID|           name|
+-----------+---------------+
|    1009220|Captain America|
|    1010740| Winter Soldier|
|    1009471|      Nick Fury|
|    1009552|   S.H.I.E.L.D.|
|    1009228|  Sharon Carter|
+-----------+---------------+
only showing top 5 rows



In [42]:
# Read the CSV file into a DataFrame
df_stats = spark.read.csv(f"{input_path}/characters_stats.csv", header=True, inferSchema=True)
df_stats = df_stats.withColumnRenamed("Name","name")
df_stats.show(5)

+-----------+---------+------------+--------+-----+----------+-----+------+-----+
|       name|Alignment|Intelligence|Strength|Speed|Durability|Power|Combat|Total|
+-----------+---------+------------+--------+-----+----------+-----+------+-----+
|    3-D Man|     good|          50|      31|   43|        32|   25|    52|  233|
|     A-Bomb|     good|          38|     100|   17|        80|   17|    64|  316|
| Abe Sapien|     good|          88|      14|   35|        42|   35|    85|  299|
|   Abin Sur|     good|          50|      90|   53|        64|   84|    65|  406|
|Abomination|      bad|          63|      80|   53|        90|   55|    95|  436|
+-----------+---------+------------+--------+-----+----------+-----+------+-----+
only showing top 5 rows



In [43]:
df_char_stats = df_char.join(df_stats, on="name", how="inner")
# Add audit columns
df_char_stats = df_char_stats.withColumn("batch_id", lit("101"))
df_char_stats = df_char_stats.withColumn("load_date", current_timestamp().cast("string"))
df_char_stats.show(5)


+---------------+-----------+---------+------------+--------+-----+----------+-----+------+-----+--------+--------------------+
|           name|characterID|Alignment|Intelligence|Strength|Speed|Durability|Power|Combat|Total|batch_id|           load_date|
+---------------+-----------+---------+------------+--------+-----+----------+-----+------+-----+--------+--------------------+
|Captain America|    1009220|     good|          63|      19|   35|        56|   46|   100|  319|     101|2023-09-16 05:46:...|
| Winter Soldier|    1010740|     good|          56|      32|   35|        65|   60|    84|  332|     101|2023-09-16 05:46:...|
|      Nick Fury|    1009471|     good|          75|      11|   23|        42|   25|   100|  276|     101|2023-09-16 05:46:...|
|       Punisher|    1009515|     good|          50|      16|   23|        28|   22|   100|  239|     101|2023-09-16 05:46:...|
|      Red Skull|    1009535|      bad|          75|      10|   12|        14|   19|    80|  210|     10

In [5]:
# Save the DataFrame to a Parquet file
dfp_char_stats = df_char_stats.toPandas()
try:
    df_char_stats.write.parquet(f"{output_path}/char_stats_day_dly", mode="overwrite")
    # Print a message to confirm the file has been saved
    print(f"DataFrame saved to Parquet file: {output_path}")
except Exception as e:
    print(f"Error writing DataFrame to Parquet: {str(e)}")


NameError: name 'df_char_stats' is not defined

In [77]:
# Define the path to your SQL script
sql_script_path = "modelling.sql"

# Read and execute SQL statements from the script
with open(sql_script_path, "r") as script_file:
    sql_statements = script_file.read().split(";")  # Split statements by semicolon

    # Remove empty statements
    sql_statements = [statement.strip() for statement in sql_statements if statement.strip()]

    # Execute each SQL statement separately
    for statement in sql_statements:
        spark.sql(statement)
    # Save the SparkSession to a file
#spark.sparkContext.setCheckpointDir("path/to/checkpoint")  # Set checkpoint directory
#spark.save()
print("Table created Successfully")

Table created Successfully


In [78]:
spark.sql("describe formatted  db_sil_marvel.char_stats_day_dly").show(truncate=False)

+----------------------------+----------------------------+-------+
|col_name                    |data_type                   |comment|
+----------------------------+----------------------------+-------+
|name                        |string                      |null   |
|characterID                 |int                         |null   |
|Alignment                   |string                      |null   |
|Intelligence                |int                         |null   |
|Strength                    |int                         |null   |
|Speed                       |int                         |null   |
|Durability                  |int                         |null   |
|Power                       |int                         |null   |
|Combat                      |int                         |null   |
|Total                       |int                         |null   |
|batch_id                    |string                      |null   |
|load_date                   |string            

In [80]:
spark.sql("select * from db_sil_marvel.char_stats_day_dly").show(5,truncate=False)

+---------------+-----------+---------+------------+--------+-----+----------+-----+------+-----+--------+--------------------------+
|name           |characterID|Alignment|Intelligence|Strength|Speed|Durability|Power|Combat|Total|batch_id|load_date                 |
+---------------+-----------+---------+------------+--------+-----+----------+-----+------+-----+--------+--------------------------+
|Captain America|1009220    |good     |63          |19      |35   |56        |46   |100   |319  |101     |2023-09-16 05:52:14.072456|
|Winter Soldier |1010740    |good     |56          |32      |35   |65        |60   |84    |332  |101     |2023-09-16 05:52:14.072456|
|Nick Fury      |1009471    |good     |75          |11      |23   |42        |25   |100   |276  |101     |2023-09-16 05:52:14.072456|
|Punisher       |1009515    |good     |50          |16      |23   |28        |22   |100   |239  |101     |2023-09-16 05:52:14.072456|
|Red Skull      |1009535    |bad      |75          |10      |1

In [81]:
spark.sql('select count(1) total_heros,alignment from db_sil_marvel.char_stats_day_dly  group by 2;').show(truncate=False)

+-----------+---------+
|total_heros|alignment|
+-----------+---------+
|5          |neutral  |
|50         |bad      |
|143        |good     |
+-----------+---------+



In [8]:
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp
from pyspark.sql.functions import *
import yaml

# Configure logging
log_file = "data_pipeline.log"
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Function to read the configuration from config.yaml
def read_config():
    try:
        with open('config.yaml', 'r') as config_file:
            config = yaml.safe_load(config_file)
        return config
    except FileNotFoundError:
        logging.error("Error: config.yaml file not found.")
        return None
    except yaml.YAMLError as e:
        logging.error(f"Error loading config.yaml: {e}")
        return None

if __name__ == "__main__":
    try:
        # Read the configuration from config.yaml
        config = read_config()

        if config:
            # Accessing the input_path from the config
            input_path = config.get('input_path')
            output_path = config.get('output_path')
            log_path = config.get('log_path')
            table_name = config.get('table_name')
            source_file1 = config.get('source_file1')
            source_file2 = config.get('source_file2')

            # Logging input and output paths
            logging.info(f"Input Path: {input_path}")
            logging.info(f"Output Path: {output_path}")
        else:
            logging.error("Configuration not loaded. Please check the YAML file and its location.")

        # Creating a Spark session with the configured app name
        spark = SparkSession.builder.appName(config["spark"]["app_name"]).getOrCreate()

        # Reading the Characters CSV file into a DataFrame
        logging.info(f"Reading {source_file1}.")
        df_char = spark.read.csv(f"{input_path}/{source_file1}", header=True, inferSchema=True)
        
        # Reading the Character Stats CSV file into a DataFrame
        logging.info(f"Reading {source_file2}.")
        df_stats = spark.read.csv(f"{input_path}/{source_file2}", header=True, inferSchema=True)
        df_stats = df_stats.withColumnRenamed("Name","name")

        # Joining the character & Character Stats files
        logging.info("Joining character and Character Stats DataFrames.")
        df_char_stats = df_char.join(df_stats, on="name", how="inner")

        # Adding the audit columns
        logging.info("Adding audit columns.")
        df_char_stats = df_char_stats.withColumn("batch_id", lit("101"))
        df_char_stats = df_char_stats.withColumn("load_date", current_timestamp().cast("string"))

        # Saving the DataFrame to a Parquet file
        logging.info("Saving DataFrame to Parquet file.")
        dfp_char_stats = df_char_stats.toPandas()
        df_char_stats.write.parquet(f"{output_path}/char_stats_day_dly", mode="overwrite")

        # Print a message to confirm the file has been saved
        logging.info(f"DataFrame saved to Parquet file: {output_path}")

        # Defining the path to your SQL script which will be used for creating Data Objects such as schema and tables
        sql_script_path = "modelling.sql"

        # Reading and executing SQL statements from the modelling script
        logging.info("Reading and executing SQL statements from modelling script.")
        with open(sql_script_path, "r") as script_file:
            sql_statements = script_file.read().split(";")  # Split statements by semicolon

            # Removing if any empty statements in the modelling sql
            logging.info("Removing if any empty statements in the modelling sql.")
            sql_statements = [statement.strip() for statement in sql_statements if statement.strip()]

            # Execute each SQL statement separately
            for statement in sql_statements:
                spark.sql(statement)

        logging.info("Table created Successfully")

        #showing the stats of the table created for data analytics purpose.
        logging.info("Describing the table.")
        spark.sql("describe formatted  db_sil_marvel.char_stats_day_dly").show(truncate=False)

        logging.info("Displaying the first 5 rows of the table...")
        spark.sql("select * from db_sil_marvel.char_stats_day_dly").show(5, truncate=False)

        logging.info("Running data analytics query.")
        spark.sql('select count(1) total_heros,alignment from db_sil_marvel.char_stats_day_dly  group by 2;').show(truncate=False)

        logging.info("Data pipeline completed successfully")

    except Exception as e:
        logging.error(f"Error: {str(e)}")


+----------------------------+----------------------------+-------+
|col_name                    |data_type                   |comment|
+----------------------------+----------------------------+-------+
|name                        |string                      |null   |
|characterID                 |int                         |null   |
|Alignment                   |string                      |null   |
|Intelligence                |int                         |null   |
|Strength                    |int                         |null   |
|Speed                       |int                         |null   |
|Durability                  |int                         |null   |
|Power                       |int                         |null   |
|Combat                      |int                         |null   |
|Total                       |int                         |null   |
|batch_id                    |string                      |null   |
|load_date                   |string            

In [1]:
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp
from pyspark.sql.functions import *
import yaml
import os
import datetime
from datetime import datetime as dt

# Function to initialize the logger with the date-appended log file name
def init_logger(log_path):
    os.makedirs(log_path, exist_ok=True)
    log_file_name = f"data_pipeline_{dt.now().strftime('%Y%m%d')}.log"
    logging.basicConfig(
        filename=os.path.join(log_path, log_file_name),
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )

# Configure logging
log_path = "/home/jovyan/work/digital/logs/"  # Set your log directory path here
init_logger(log_path)
    
# Function to read the configuration from config.yaml
def read_config():
    try:
        with open('config.yaml', 'r') as config_file:
            config = yaml.safe_load(config_file)
        return config
    except FileNotFoundError:
        logging.error("Error: config.yaml file not found.")
        return None
    except yaml.YAMLError as e:
        logging.error(f"Error loading config.yaml: {e}")
        return None

if __name__ == "__main__":
    try:
        # Read the configuration from config.yaml
        config = read_config()

        if config:
            # Accessing configuration parameters
            input_path = config.get('input_path')
            output_path = config.get('output_path')
            log_path = config.get('log_path')
            table_name = config.get('table_name')
            source_file1 = config.get('source_file1')
            source_file2 = config.get('source_file2')

            # Initialize the logger with the log_path from config
            #print(f"log path is :{log_path}")
            #init_logger(log_path)

            # Logging input and output paths
            logging.info(f"Input Path: {input_path}")
            logging.info(f"Output Path: {output_path}")
        else:
            logging.error("Configuration not loaded. Please check the YAML file and its location.")

        # Creating a Spark session with the configured app name
        spark = SparkSession.builder.appName(config["spark"]["app_name"]).getOrCreate()
        # Reading the Characters CSV file into a DataFrame
        logging.info(f"Reading {source_file1}.")
        df_char = spark.read.csv(f"{input_path}/{source_file1}", header=True, inferSchema=True)
        
        # Reading the Character Stats CSV file into a DataFrame
        logging.info(f"Reading {source_file2}.")
        df_stats = spark.read.csv(f"{input_path}/{source_file2}", header=True, inferSchema=True)
        df_stats = df_stats.withColumnRenamed("Name","name")

        # Joining the character & Character Stats files
        logging.info("Joining character and Character Stats DataFrames.")
        df_char_stats = df_char.join(df_stats, on="name", how="inner")

        # Adding the audit columns
        logging.info("Adding audit columns.")
        df_char_stats = df_char_stats.withColumn("batch_id", lit("101"))
        df_char_stats = df_char_stats.withColumn("load_date", current_timestamp().cast("string"))

        # Saving the DataFrame to a Parquet file
        logging.info("Saving DataFrame to Parquet file.")
        dfp_char_stats = df_char_stats.toPandas()
        df_char_stats.write.parquet(f"{output_path}/char_stats_day_dly", mode="overwrite")

        # Print a message to confirm the file has been saved
        logging.info(f"DataFrame saved to Parquet file: {output_path}")

        # Defining the path to your SQL script which will be used for creating Data Objects such as schema and tables
        sql_script_path = "modelling.sql"

        # Reading and executing SQL statements from the modelling script
        logging.info("Reading and executing SQL statements from modelling script.")
        with open(sql_script_path, "r") as script_file:
            sql_statements = script_file.read().split(";")  # Split statements by semicolon

            # Removing if any empty statements in the modelling sql
            logging.info("Removing if any empty statements in the modelling sql.")
            sql_statements = [statement.strip() for statement in sql_statements if statement.strip()]

            # Execute each SQL statement separately
            for statement in sql_statements:
                spark.sql(statement)

        logging.info("Table created Successfully")

        #showing the stats of the table created for data analytics purpose.
        logging.info("Describing the table.")
        spark.sql("describe formatted  db_sil_marvel.char_stats_day_dly").show(truncate=False)

        logging.info("Displaying the first 5 rows of the table.")
        spark.sql("select * from db_sil_marvel.char_stats_day_dly").show(5, truncate=False)

        logging.info("Running data analytics query.")
        spark.sql('select count(1) total_heros,alignment from db_sil_marvel.char_stats_day_dly  group by 2;').show(truncate=False)

        logging.info("Data pipeline completed successfully")

    except Exception as e:
        logging.error(f"Error: {str(e)}")


+----------------------------+----------------------------+-------+
|col_name                    |data_type                   |comment|
+----------------------------+----------------------------+-------+
|name                        |string                      |null   |
|characterID                 |int                         |null   |
|Alignment                   |string                      |null   |
|Intelligence                |int                         |null   |
|Strength                    |int                         |null   |
|Speed                       |int                         |null   |
|Durability                  |int                         |null   |
|Power                       |int                         |null   |
|Combat                      |int                         |null   |
|Total                       |int                         |null   |
|batch_id                    |string                      |null   |
|load_date                   |string            

In [15]:
import os
import logging
from datetime import datetime

def init_logger(log_path):
    os.makedirs(log_path, exist_ok=True)
    log_file_name = f"data_pipeline_{datetime.now().strftime('%Y%m%d')}.log"
    
    # Create a logger
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    
    # Create a file handler and set the log file path
    file_handler = logging.FileHandler(os.path.join(log_path, log_file_name))
    
    # Create a formatter with the desired log format
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    
    # Add the file handler to the logger
    logger.addHandler(file_handler)
    
    return logger

# Example usage:
log_path = "/home/jovyan/work/digital/logs/"  # Set your log path here
logger = init_logger(log_path)
logger.info("This is a log message.")
