In [101]:
import os, sys, random, json
import pandas as pd
import numpy as np
import psycopg2

In [375]:
# data_preparation.py

def fetch_data_path(cwd):
    
    try:
        weather_data_path = os.path.join(cwd, 'wx_data')
        crop_data_path = os.path.join(cwd, 'yld_data')
        logging.info("weather data and crop data path variables created")
        
        return weather_data_path, crop_data_path
    except Exception as e:
        raise CustomException(e, sys)


def prepare_weather_data(input_path):
    
    try:
        filelists = []
        for f in os.listdir(input_path):
            tempdf = pd.read_csv(os.path.join(input_path, f), sep = '\t', names = ['date', 'max_temp', 'min_temp', 'precipitation_amt'])
            station_id = f[:f.index('.')]
            tempdf['max_temp'] = tempdf['max_temp']/10   # recording maximum temperature in celsius
            tempdf['min_temp'] = tempdf['min_temp']/10   # recording minimum temperature in celsius
            tempdf['precipitation_amt'] = tempdf['precipitation_amt']/100  # recording precipitation amount in centimeters
            tempdf['station_id'] = station_id
            tempdf['date'] = pd.to_datetime(tempdf.date, format = '%Y%m%d').dt.date
            tempdf['wid'] = tempdf['station_id'] + '_' + tempdf.date.astype(str)
            filelists.append(tempdf)
        logging.info('weather dataframe created and ready for ingestion into Postgres Table')
        
        return pd.concat(filelists, axis=0, ignore_index=True)
    except Exception as e:
        raise CustomException(e, sys)


def prepare_crop_data(input_path):
    
    try:
        for f in os.listdir(input_path):
            df = pd.read_csv(os.path.join(input_path, f), sep= '\t', names= ['year', 'crop_grain_yield'])
        logging.info('crop dataframe created and ready for ingestion into Postgres Table')
        
        return df
    except Exception as e:
        raise CustomException(e, sys)
    
    

In [377]:
weather_data_path, crop_data_path = fetch_data_path(os.getcwd())
weather_df = prepare_weather_data(weather_data_path)
crop_df = prepare_crop_data(crop_data_path)

In [381]:
weather_df

Unnamed: 0,date,max_temp,min_temp,precipitation_amt,station_id,wid
0,1985-01-01,-2.2,-12.8,0.94,USC00110072,USC00110072_1985-01-01
1,1985-01-02,-12.2,-21.7,0.00,USC00110072,USC00110072_1985-01-02
2,1985-01-03,-10.6,-24.4,0.00,USC00110072,USC00110072_1985-01-03
3,1985-01-04,-5.6,-18.9,0.00,USC00110072,USC00110072_1985-01-04
4,1985-01-05,1.1,-7.8,0.00,USC00110072,USC00110072_1985-01-05
...,...,...,...,...,...,...
1729952,2014-12-27,10.6,-0.6,0.00,USC00339312,USC00339312_2014-12-27
1729953,2014-12-28,12.8,1.1,0.91,USC00339312,USC00339312_2014-12-28
1729954,2014-12-29,3.3,-5.6,0.00,USC00339312,USC00339312_2014-12-29
1729955,2014-12-30,1.1,-8.3,0.00,USC00339312,USC00339312_2014-12-30


In [382]:
crop_df.head()

Unnamed: 0,year,crop_grain_yield
0,1985,225447
1,1986,208944
2,1987,181143
3,1988,125194
4,1989,191320


In [408]:
# database_operations.py

def create_db_conn(**db_params):
    
    try:
        conn = psycopg2.connect(
            host = db_params['hostname'], 
            port= db_params['port'], 
            database= db_params['dbname'], 
            user= db_params['username'], 
            password = db_params['password']
        )
        cursor = conn.cursor()
        logging.info('Postgres Database connection and cursor objects initialized')
        
        return conn, cursor
    except Exception as e:
        raise CustomException(e, sys)


def create_weather_data_table(connection, cursor):
    
    try:
        create_table = '''
                        CREATE TABLE IF NOT EXISTS weather_data(
                        date DATE NOT NULL,
                        max_temp INTEGER NOT NULL,
                        min_temp NUMERIC NOT NULL,
                        precipitation_amt NUMERIC NULL,
                        station_id TEXT NULL,
                        wid TEXT PRIMARY KEY NOT NULL);
                        '''
        cursor.execute(create_table)
        connection.commit()
        connection.close()
        logging.info("Table created: weather_data")
        
        return None
    except Exception as e:
        raise CustomException(e, sys)


def create_crop_yield_table(connection, cursor):
    
    try:
        create_table = '''
                        CREATE TABLE IF NOT EXISTS crop_yield_data(
                        year NUMERIC NOT NULL,
                        crop_grain_yield NUMERIC NOT NULL);
                       '''
        cursor.execute(create_table)
        connection.commit()
        connection.close()
        logging.info('Table created: crop_yield_data')
        
        return None
    except Exception as e:
        raise CustomException(e, sys)
    

def create_weather_data_transformed_table(connection, cursor):
    
    try:
        create_table = '''
                        CREATE TABLE IF NOT EXISTS weather_data_transformed(
                        years NUMERIC NOT NULL,
                        station_id TEXT NULL,
                        avg_min_temp NUMERIC NOT NULL,
                        avg_max_temp NUMERIC NOT NULL,
                        total_precipitation_amt NUMERIC NULL);
                        '''
        cursor.execute(create_table)
        connection.commit()
        connection.close()
        logging.info("Table created: weather_data_transformed")
        
        return None
    except Exception as e:
        raise CustomException(e, sys)


def insert_data_into_db(connection, cursor, table_name, df):
    
    try:
        count = 0
        start_time = datetime.now()
        if table_name == 'weather_data':
            df.date = df.date.astype(str)
        # Convert columns to the best possible dtypes using dtypes supporting pd.NA
        df = df.convert_dtypes()
        # Define column names for the tables
        column_names = df.columns.tolist()

        # Loop through rows of Pandas DataFrame and insert into PostgreSQL table
        for index, row in df.iterrows():
            # Create a tuple of values to insert
            values = tuple(row.values)

            # Check if the row already exists in the database
            sql_query = "SELECT EXISTS (SELECT 1 FROM {} WHERE {})".format(table_name, " AND ".join([f"{column}=%s" for column in column_names]))
            cursor.execute(sql_query, values)
            result = cursor.fetchone()

            # If the row doesn't already exist, insert it
            if not result[0]:
                sql_query = "INSERT INTO {} ({}) VALUES {}".format(table_name, ", ".join(column_names), values)
                cursor.execute(sql_query)
                count += 1

        # Commit changes to the database
        connection.commit()
        cursor.close()
        end_time = datetime.now()
        logging.info(f"Ingestion process started at {start_time} and finished at {end_time}, and a total number of {count} records were ingested.")
    
        return None
    except Exception as e:
        raise CustomException(e, sys)

In [383]:
conn, cursor = create_db_conn(**db_params)
create_weather_data_table(conn, cursor)
conn, cursor = create_db_conn(**db_params)
create_crop_yield_table(conn, cursor)

In [384]:
conn, cursor = create_db_conn(**db_params)
insert_data_into_db(conn, cursor, 'crop_yield_data', crop_df)
conn, cursor = create_db_conn(**db_params)
insert_data_into_db(conn, cursor, 'weather_data', weather_df)

In [415]:
# data_analysis.py
conn, cursor = create_db_conn(**db_params)

In [418]:
# data_analysis.py

def data_transformation(connection, cursor, table_name):
    
    try:
        start_time = datetime.now()
        sql_query = f'''
            SELECT EXTRACT(YEAR FROM date) as years, station_id, AVG(min_temp) as avg_min_temp, AVG(max_temp) as avg_max_temp, SUM(precipitation_amt) as total_precipitation_amt 
            FROM {table_name}
            WHERE min_temp <> -999.9 OR max_temp <> -999.9 OR precipitation_amt <> -99.99
            GROUP BY years, station_id;
            '''
        transformed_df = pd.read_sql(sql_query, con=connection)
        end_time = datetime.now()
        logging.info(f"Data Transformation process started at {start_time} and finished at {end_time}, and a total number of {len(transformed_df)} \
                     records were transformed.")
        
        return transformed_df
    except Exception as e:
        raise CustomException(e, sys)

In [411]:
create_weather_data_transformed_table(conn, cursor)

In [414]:
transformed_df = data_transformation(conn, cursor, 'weather_data')

In [416]:
insert_data_into_db(conn, cursor, 'weather_data_transformed', transformed_df)

In [419]:
transformed_df

Unnamed: 0,years,station_id,avg_min_temp,avg_max_temp,total_precipitation_amt
0,1985.0,USC00110072,-1.176164,15.320548,78.01
1,1985.0,USC00110187,-3.232877,8.076712,-53.43
2,1985.0,USC00110338,2.970685,14.430137,95.16
3,1985.0,USC00111280,6.513151,16.964384,114.02
4,1985.0,USC00111436,6.441370,16.712329,-575.30
...,...,...,...,...,...
4815,2014.0,USC00338552,-131.335068,-119.619178,-4205.41
4816,2014.0,USC00338769,-61.027123,-45.947945,93.48
4817,2014.0,USC00338822,2.370137,13.495890,82.22
4818,2014.0,USC00338830,-28.702192,-5.117808,-607.98


In [431]:
weather_df['date'] = pd.to_datetime(weather_df['date'])
weather_df['year'] = [x.year for x in weather_df['date']]

In [432]:
weather_df

Unnamed: 0,date,max_temp,min_temp,precipitation_amt,station_id,wid,year
0,1985-01-01,-2.2,-12.8,0.94,USC00110072,USC00110072_1985-01-01,1985
1,1985-01-02,-12.2,-21.7,0.00,USC00110072,USC00110072_1985-01-02,1985
2,1985-01-03,-10.6,-24.4,0.00,USC00110072,USC00110072_1985-01-03,1985
3,1985-01-04,-5.6,-18.9,0.00,USC00110072,USC00110072_1985-01-04,1985
4,1985-01-05,1.1,-7.8,0.00,USC00110072,USC00110072_1985-01-05,1985
...,...,...,...,...,...,...,...
1729952,2014-12-27,10.6,-0.6,0.00,USC00339312,USC00339312_2014-12-27,2014
1729953,2014-12-28,12.8,1.1,0.91,USC00339312,USC00339312_2014-12-28,2014
1729954,2014-12-29,3.3,-5.6,0.00,USC00339312,USC00339312_2014-12-29,2014
1729955,2014-12-30,1.1,-8.3,0.00,USC00339312,USC00339312_2014-12-30,2014


In [436]:
tran = pd.DataFrame(weather_df.groupby(['year','station_id']).agg({'max_temp':'mean', 'min_temp':'mean','precipitation_amt':'sum'}).reset_index())

In [None]:
df.groupby('A').agg({'B': ['mean', 'sum'], 'C': ['max', 'min']})


In [438]:
tran.columns

Index(['year', 'station_id', 'max_temp', 'min_temp', 'precipitation_amt'], dtype='object')

In [169]:
# utils.py

from configparser import ConfigParser

# Read Config file
cfg_file = 'config.ini'
config = ConfigParser()
config.read(cfg_file, encoding="utf-8")

# Storing the contents of the config file into respective dictionary variables
db_params = dict(config.items('db_params'))

In [332]:
# logger.py
import logging
import os
from datetime import datetime

LOG_FILE=f"{datetime.now().strftime('%m_%d_%Y_%H_%M_%S')}.log"
logs_path=os.path.join(os.getcwd(),"logs",LOG_FILE)
os.makedirs(logs_path, exist_ok=True)  # if the folder exists, keep on appending the log file to the folder itself

LOG_FILE_PATH=os.path.join(logs_path,LOG_FILE)


logging.basicConfig(
    filename=LOG_FILE_PATH,
    format="[ %(asctime)s ] %(lineno)d %(name)s - %(levelname)s - %(message)s", 
    level=logging.INFO
)


# Get the current time
start_time = datetime.now()

# Log the start time
# logging.info('Program started at %s', start_time)

# Your program code here...
y = 0
for i in range(10000):
    y = i + 1

# Get the current time
end_time = datetime.now()

# Log the results
logging.info(f'Program started at {start_time} and ended at {end_time} with number of records ingested: {y}')


In [324]:
# exception.py
import sys
#from src.logger import logging

def error_message_detail(error, error_detail:sys):
    _,_,exc_tb=error_detail.exc_info()    # Throws which file and which line an exception is occurring in
    file_name=exc_tb.tb_frame.f_code.co_filename    # Gets the file name
    error_message="Error occurred in python script name [{0}] line number [{1}] error message [{2}]".format(
        file_name,exc_tb.tb_lineno,str(error)
    )
    return error_message


class CustomException(Exception):
    def __init__(self, error_message, error_detail:sys):
        super().__init__(error_message)
        self.error_message=error_message_detail(error_message, error_detail=error_detail)

    def __str__(self):
        return self.error_message

In [325]:
try:
    a = 52
    b = 63
    c = a*b
    d = b/c
    e = 0
    f = a/e
except Exception as e:
    raise CustomException(e, sys)

CustomException: Error occurred in python script name [C:\Users\abhij\AppData\Local\Temp\ipykernel_27580\2960705939.py] line number [7] error message [division by zero]

In [126]:
conn = psycopg2.connect(host='localhost', port=5432, database='Crop_Weather_ETL', user='postgres', password='M408a1@543')

In [127]:
cursor = conn.cursor()

In [125]:
conn.close()

In [38]:
cursor.execute("INSERT INTO weather_transformed (w_key, temperature_recorded_date, max_temp_in_c, min_temp_in_c, precipitation_CM, region, file_name) \
                   VALUES ('1', '19940203', 42.4, -45.4, 102, 461, 12)");
conn.commit()

In [44]:
path = r'C:/Users/abhij/OneDrive - Indiana University/Interview Coding Tests/Corteva_Data Engineering/code-challenge-template/wx_data/'

In [62]:
csvlist = []
for i in os.listdir(path):
    dfname = 'df' + '_' + str(i)
    dfname = pd.read_csv(os.path.join(path, i), sep='\t', names=['Date', 'max_temp', 'min_temp', 'precipitation_amt'])
    dfname['fname'] = i
    csvlist.append(dfname)
    
df = pd.concat(csvlist, axis=0, ignore_index=True)

In [92]:
df = df.rename(columns={'Date':'date'})

In [142]:
df.head()

Unnamed: 0,date,max_temp,min_temp,precipitation_amt,fname
0,19850101,-22,-128,94,USC00110072.txt
1,19850102,-122,-217,0,USC00110072.txt
2,19850103,-106,-244,0,USC00110072.txt
3,19850104,-56,-189,0,USC00110072.txt
4,19850105,11,-78,0,USC00110072.txt


In [102]:
df.columns.tolist()

['date', 'max_temp', 'min_temp', 'precipitation_amt', 'fname']

In [115]:
# Define PostgreSQL column names
column_names = df.columns.tolist()

# Loop through rows of Pandas DataFrame and insert into PostgreSQL table
for i,row in df[:100].iterrows():
    sql = "INSERT INTO {} ({}) VALUES {} ON CONFLICT DO NOTHING".format('weather_new', ", ".join(column_names), tuple(row))
    cursor.execute(sql);
    
# Commit changes to the database
conn.commit()

In [274]:

column_names = weather_df.columns.tolist()

# Loop through rows of Pandas DataFrame and insert into PostgreSQL table
for index, row in df[:100].iterrows():
    # Create a tuple of values to insert
    values = tuple(row.values)
    
    # Check if the row already exists in the database
    sql = "SELECT EXISTS (SELECT 1 FROM {} WHERE {})".format('weather_data', " AND ".join([f"{column}=%s" for column in column_names]))
    cursor.execute(sql, values)
    result = cursor.fetchone()
    
    # If the row doesn't already exist, insert it
    if not result[0]:
        sql = "INSERT INTO {} ({}) VALUES {}".format('weather_data', ", ".join(column_names), values)
        cursor.execute(sql)

# Commit changes to the database
conn.commit()


IndexError: tuple index out of range

In [122]:
df['date'] = df.date.astype(str)

In [123]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1729957 entries, 0 to 1729956
Data columns (total 5 columns):
 #   Column             Dtype 
---  ------             ----- 
 0   date               object
 1   max_temp           int64 
 2   min_temp           int64 
 3   precipitation_amt  int64 
 4   fname              object
dtypes: int64(3), object(2)
memory usage: 66.0+ MB
