In [1]:
import os
import glob as gb
import pandas as pd
import logging
from datetime import datetime
import json
import xml.etree.ElementTree as et
import boto3
import io
from io import StringIO
import xml.etree.ElementTree as ET


# logging configuration

logger = logging.getLogger()
fhandler = logging.FileHandler(filename='Etllog.log', mode='a')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fhandler.setFormatter(formatter)
logger.addHandler(fhandler)
logger.setLevel(logging.DEBUG)

# AWS credentials
config_data=json.load(open("C:\\Users\\User\\iamconfig.json"))
access_key=config_data["aws_access_key_id"]
secret_key=config_data["aws_secret_access_key"]

# creating a s3_client using our credentials
s3=boto3.client("s3",aws_access_key_id=access_key,aws_secret_access_key=secret_key) 
bucket_name="my-etl-project-source"
s3_folder="sourcedataset"
def uploadfileintos3():
    try:
        filelist=os.listdir("C:\\Users\\User\\source1")
        for file in filelist:
            localfilepath=f"C:\\Users\\User\\source1\\{file}"
            s3_key=f"{s3_folder}/{file}"
            response = s3.upload_file(localfilepath,bucket_name,s3_key)
            logger.info('File uploaed into S3 bucket')
            return response
    except Exception as e:
        logger.error(f'Error in uploading file into S3:{e}')
        raise

   
# Specify your bucket name
bucket_name = 'my-etl-project-source'

# List objects in the bucket
response = s3.list_objects_v2(Bucket=bucket_name)
for key in response.get('Contents', []):
    print(key)
    print('*********************************************')

# Filter CSV files
csv_files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith('.csv')]
print(csv_files)

# Function to read CSV from S3
def read_csv_from_s3(bucket, key):
    
    try:
        csv_obj = s3.get_object(Bucket=bucket, Key=key)
        body = csv_obj['Body'].read().decode('utf-8')
        logger.info('Data extracted from csv files')
        return pd.read_csv(StringIO(body))
    except Exception as e:
        logger.error(f'Error in Extracting data from csv files:{e}')
        raise

# Function for Read and concatenate all CSV files into a single DataFrame
def combined_csv():
    dataframes = [read_csv_from_s3(bucket_name, key) for key in csv_files]
    combined_df = pd.concat(dataframes, ignore_index=True)
    combined_df=combined_df.drop_duplicates()
    # Remove multiple headers
    header = combined_df.columns
    combined_df = combined_df[~combined_df.apply(lambda row: row.equals(header), axis=1)]

    # Reset index after cleaning
    combined_df.reset_index(drop=True, inplace=True)
    #print(combined_df)
    # Display the cleaned DataFrame
    return combined_df


# Function to read json from S3
json_files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith('.json')]
print(json_files)
print('**************************************************')

# Read JSON files
def read_json_from_s3(bucket, key):
    for file in json_files:
        json_obj = s3.get_object(Bucket=bucket_name, Key=file)
        file_content = json_obj['Body'].read().decode('utf-8')
        #print(file_content)
        return pd.read_json(StringIO(file_content),lines=True)
   
def combined_json():
    dataframes = [read_json_from_s3(bucket_name, key) for key in json_files]
    combined_df = pd.concat(dataframes, ignore_index=True)
    combined_df=combined_df.drop_duplicates()
    # Remove multiple headers
    header = combined_df.columns
    combined_df = combined_df[~combined_df.apply(lambda row: row.equals(header), axis=1)]

    # Reset index after cleaning
    combined_df.reset_index(drop=True, inplace=True)

    # Display the cleaned DataFrame
    return(combined_df)


# Filter XML files


xml_files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith('.xml')]
print(xml_files)
print('***********************************************')

# Function to parse XML content to DataFrame
def parse_xml_to_df(xml_content):
    root = ET.fromstring(xml_content)
    data = []
    for child in root:
        data.append({elem.tag: elem.text for elem in child})
    return pd.DataFrame(data)

# Read XML files and convert to DataFrame
def combined_xml():
    dataframes = []
    for xml_file in xml_files:
        obj = s3.get_object(Bucket=bucket_name, Key=xml_file)
        xml_content = obj['Body'].read().decode('utf-8')
        xml_df = parse_xml_to_df(xml_content)
        dataframes.append(xml_df)

    # Combine all DataFrames
    combined_df = pd.concat(dataframes, ignore_index=True)
    combined_df=combined_df.drop_duplicates()

    # Remove multiple headers
    combined_df = combined_df[combined_df.apply(lambda x: not all(x == combined_df.columns), axis=1)]

    # Display the final DataFrame
    return(combined_df)

def Combined_df_csv_xml_json():
     try:
        csv_df=combined_csv()
        json_df=combined_json()
        xml_df=combined_xml()
        df=[csv_df,json_df,xml_df]
        cdf=pd.concat(df,ignore_index=True)
        #cdf1=cdf.drop_duplicates()
        logger.info('All data frames combined successfully')
        return cdf
     except Exception as e:
        logger.error(f'Error in combining all data farmes:{e}')
        raise

DF=Combined_df_csv_xml_json()
print(DF)

def transform_data(DF):
    try:
        DF['height'] = pd.to_numeric(DF['height'])
        DF['HeightsinMeters']=DF['height']*0.3048
        DF['weight'] = pd.to_numeric(DF['weight'])
        DF['WeightsinKgs']=DF['weight']*0.453592
        logger.info('Data transformed successfully')
        return(DF)
    except Exception as e:
        logger.error(f'Error transforming data :{e}')
        raise
df1=transform_data(DF)
print(df1)
# dataframe to CSV file
def load_data(DF,CSVfile):
    try: 
        DF.to_csv(CSVfile,index=False)
        logger.info('Data Loading is successful')
    except Exception as e:
        logger.error(f'Error in Loading data :{e}')
        raise
def etl_process():
        Output_path='C:/Users/User/Source1_Output/csvfile.csv'
        data =Combined_df_csv_xml_json()
        T_data=transform_data(data)
        load_data(T_data,Output_path)
        logger.info('ETL process completed successfully')
   

def upload_transformed_fileintos3():
    try:
        #transformed_file=os.listdir("C:\\Users\\User\\Source1_output")
        tbucket_name="my-etl-project-transformeddata"
        localfilepath=f"C:\\Users\\User\\Source1_output\\csvfile.csv"
        s3_key="Csvfile.csv"
        response1 = s3.upload_file(localfilepath,tbucket_name,s3_key)
        print(f"File uploaded successfully to {tbucket_name}")
        logger.info('File uploaed into S3 bucket')
        return response
    except Exception as e:
        logger.error(f'Error in uploading file into S3:{e}')
        raise

    



if __name__=='__main__':
    etl_process()
    
    upload_transformed_fileintos3()

{'Key': 'sourcedataset/source1.csv', 'LastModified': datetime.datetime(2025, 3, 4, 15, 37, 10, tzinfo=tzutc()), 'ETag': '"ce1d038c596e21cdda63fdd492ff888c"', 'ChecksumAlgorithm': ['CRC32'], 'ChecksumType': 'FULL_OBJECT', 'Size': 109, 'StorageClass': 'STANDARD'}
*********************************************
{'Key': 'sourcedataset/source1.json', 'LastModified': datetime.datetime(2025, 3, 4, 15, 37, 10, tzinfo=tzutc()), 'ETag': '"957400fea14917bedc31d2cd94b3a04b"', 'ChecksumAlgorithm': ['CRC32'], 'ChecksumType': 'FULL_OBJECT', 'Size': 188, 'StorageClass': 'STANDARD'}
*********************************************
{'Key': 'sourcedataset/source1.xml', 'LastModified': datetime.datetime(2025, 3, 4, 15, 37, 10, tzinfo=tzutc()), 'ETag': '"daab0cb0f18c3507dbab78405d718100"', 'ChecksumAlgorithm': ['CRC32'], 'ChecksumType': 'FULL_OBJECT', 'Size': 488, 'StorageClass': 'STANDARD'}
*********************************************
{'Key': 'sourcedataset/source2.csv', 'LastModified': datetime.datetime(2025