<a href="https://colab.research.google.com/github/GMwangi3/DE_Week7/blob/main/V2_Data_Pipelines_with_PostgreSQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data Pipelines with PostgreSQL

I am developing a data pipeline that can efficiently collect, clean, and analyze equipment and network sensor data. The pipeline will aid to identify potential
equipment failures and schedule maintenance proactively, minimizing downtime and improving overall equipment performance.

## Pre-requisites

In [1]:
# Pre-requisite 1
# ---
# Importing pandas library for data manipulation
import pandas as pd
# Importing numpy library for scientific computations
import numpy as np
import psycopg2
from sqlalchemy import create_engine

## 1. Data Exploration

### Load and Review the datasets

In [2]:
# Dataset url = https://bit.ly/3YNdO2Y
# Equipment sensor dataset
equipment_df = pd.read_csv('https://raw.githubusercontent.com/GMwangi3/DE_Week7/main/equipment_sensor.csv')
equipment_df.head(5)

Unnamed: 0,ID,date,time,sensor_reading
0,1,2022-03-01,08:00:00,26.7
1,1,2022-03-01,08:15:00,28.4
2,1,2022-03-01,08:30:00,27.8
3,2,2022-03-01,08:00:00,99.1
4,2,2022-03-01,08:15:00,97.5


In [5]:
# Network sensor dataset
network_df = pd.read_csv('https://raw.githubusercontent.com/GMwangi3/DE_Week7/main/network_sensor.csv')
network_df.head(5)

Unnamed: 0,ID,date,time,sensor_reading
0,1,2022-03-01,08:00:00,0.58
1,1,2022-03-01,08:15:00,0.62
2,1,2022-03-01,08:30:00,0.6
3,2,2022-03-01,08:00:00,0.89
4,2,2022-03-01,08:15:00,0.85


In [4]:
# Maintenance records dataset
maintenance_df = pd.read_csv('https://raw.githubusercontent.com/GMwangi3/DE_Week7/main/maintenance_records.csv')
maintenance_df.head(5)

Unnamed: 0,ID,date,time,equipment_ID,maintenance_type
0,1,2022-03-01,10:00:00,1,Preventive Maintenance
1,2,2022-03-02,14:30:00,2,Corrective Maintenance
2,3,2022-03-03,08:00:00,1,Corrective Maintenance


## 2. Data Preparation

### Convert column names to lower

In [6]:
def colnames_lower(df):
  df.columns = map(str.lower, df.columns)

df_list = [equipment_df, network_df, maintenance_df]

for df in df_list:
  colnames_lower(df)

### Get column data types

In [7]:
# Function that prints the data types of columns for a given dataframe
def get_datatypes(df):
  df_name =[x for x in globals() if globals()[x] is df][0]
  print("\n" + df_name)
  print("=================")
  print(df.dtypes)

df_list = [equipment_df, network_df, maintenance_df]

for df in df_list:
  get_datatypes(df)


equipment_df
id                  int64
date               object
time               object
sensor_reading    float64
dtype: object

network_df
id                  int64
date               object
time               object
sensor_reading    float64
dtype: object

maintenance_df
id                   int64
date                object
time                object
equipment_id         int64
maintenance_type    object
dtype: object


### Missing Values

In [8]:
# Function that prints the sum of missing values per columns for a given dataframe
def get_nulls(df):
  df_name =[x for x in globals() if globals()[x] is df][0]
  print("\n" + df_name)
  print("=================\n")
  print(df.isna().sum())

for df in df_list:
  get_nulls(df)


equipment_df

id                0
date              0
time              0
sensor_reading    0
dtype: int64

network_df

id                0
date              0
time              0
sensor_reading    0
dtype: int64

maintenance_df

id                  0
date                0
time                0
equipment_id        0
maintenance_type    0
dtype: int64


### Duplicate data

In [9]:
# Function that prints number of duplicate records
def get_duplicates(df):
  df_name =[x for x in globals() if globals()[x] is df][0]
  print("\n" + df_name)
  print("=================\n")
  print(sum(df.duplicated()))

for df in df_list:
  get_duplicates(df)


equipment_df

0

network_df

0

maintenance_df

0


There are missing values for the Promo_code

## 3. Data Transformation

### Combine the date and time columns

In [10]:
equipment_df['sensor_date_time'] = pd.to_datetime(equipment_df['date'] + ' ' + equipment_df['time']).dt.strftime('%Y-%m-%d %H:%M:%S')
network_df['sensor_date_time'] = pd.to_datetime(network_df['date'] + ' ' + network_df['time']).dt.strftime('%Y-%m-%d %H:%M:%S')
maintenance_df['maintenance_date_time'] = pd.to_datetime(maintenance_df['date'] + ' ' + maintenance_df['time']).dt.strftime('%Y-%m-%d %H:%M:%S')

### Drop date and time columns

In [11]:
equipment_df.drop(['date','time'], axis=1, inplace=True)
network_df.drop(['date','time'], axis=1, inplace=True)
maintenance_df.drop(['date','time'], axis=1, inplace=True)

### Date and Time Formatting

In [12]:
# Convert the date/time columns
# equipment_df['date'] = pd.to_datetime(equipment_df['date']).dt.strftime('%Y-%m-%d')
# equipment_df['time'] = pd.to_datetime(equipment_df['time']).dt.strftime('%H:%M:%S')
# network_df['date'] = pd.to_datetime(network_df['date']).dt.strftime('%Y-%m-%d')
# network_df['time'] = pd.to_datetime(network_df['time']).dt.strftime('%H:%M:%S')
# maintenance_df['date'] = pd.to_datetime(maintenance_df['date']).dt.strftime('%Y-%m-%d')
# maintenance_df['time'] = pd.to_datetime(maintenance_df['time']).dt.strftime('%H:%M:%S')

### Rename column names

In [13]:
equipment_df=equipment_df.rename(columns={'sensor_reading':'equipment_sensor_reading'})
network_df=network_df.rename(columns={'sensor_reading':'network_sensor_reading'})
# maintenance_df=maintenance_df.rename(columns={'date':'maintenance_date','time':'maintenance_time'})

# Drop the id column for the maintenance_df
maintenance_df.drop(['id'], axis=1, inplace=True)

### Merging the Datasets

In [14]:
# Merge equipment and network sensor dataframes
#sensor_readings_df = pd.merge(left=equipment_df, right=network_df, how='left', left_on=['id','date','time'], right_on= ['id','date','time'])
sensor_readings_df = equipment_df.merge(network_df, how='left', left_on=['id','sensor_date_time'], right_on= ['id','sensor_date_time'])

#sensor_readings_df
final_merged_df = pd.merge(left=sensor_readings_df, right=maintenance_df, how='left', left_on=['id'], right_on= ['equipment_id'])

# Drop the equipment_id column for the final_merged dataframe
final_merged_df.drop(['equipment_id'], axis=1, inplace=True)
# Rename id column to equipment_id
final_merged_df=final_merged_df.rename(columns={'id':'equipment_id'})
final_merged_df = final_merged_df[['equipment_id','sensor_date_time','equipment_sensor_reading','network_sensor_reading','maintenance_date_time','maintenance_type']]

final_merged_df

Unnamed: 0,equipment_id,sensor_date_time,equipment_sensor_reading,network_sensor_reading,maintenance_date_time,maintenance_type
0,1,2022-03-01 08:00:00,26.7,0.58,2022-03-01 10:00:00,Preventive Maintenance
1,1,2022-03-01 08:00:00,26.7,0.58,2022-03-03 08:00:00,Corrective Maintenance
2,1,2022-03-01 08:15:00,28.4,0.62,2022-03-01 10:00:00,Preventive Maintenance
3,1,2022-03-01 08:15:00,28.4,0.62,2022-03-03 08:00:00,Corrective Maintenance
4,1,2022-03-01 08:30:00,27.8,0.6,2022-03-01 10:00:00,Preventive Maintenance
5,1,2022-03-01 08:30:00,27.8,0.6,2022-03-03 08:00:00,Corrective Maintenance
6,2,2022-03-01 08:00:00,99.1,0.89,2022-03-02 14:30:00,Corrective Maintenance
7,2,2022-03-01 08:15:00,97.5,0.85,2022-03-02 14:30:00,Corrective Maintenance
8,2,2022-03-01 08:30:00,98.2,0.88,2022-03-02 14:30:00,Corrective Maintenance


## 4. Data Loading

### Load the merged dataframe to Postgres DB

In [15]:
host="157.245.102.81"
port=5432
dbname="dq"
user="postgres"
password="E*3b8km$dpmRLLuf1Rs$"

# Establish connections
conn_string = f"postgresql://{user}:{password}@{host}/{dbname}"
db = create_engine(conn_string)
conn = db.raw_connection()
conn.autocommit = True

# create a cursor object
cur = conn.cursor()

create_table_query = """
CREATE TABLE IF NOT EXISTS sensor_data (
    id SERIAL PRIMARY KEY,
    equipment_id VARCHAR(64),
    sensor_date_time TIMESTAMP,
    equipment_sensor_reading NUMERIC(10,4),
    network_sensor_reading NUMERIC(10,4),
    maintenance_date_time TIMESTAMP,
    maintenance_type VARCHAR(255)
);
"""

try:
  cur.execute(create_table_query)
  conn.commit()
except:
  print("Table Creation failed!")
finally:
  final_merged_df.to_sql('sensor_data', db, if_exists='append', index=False)
  cur.execute("SELECT * from sensor_data")
  print(cur.fetchone())
  # cur.execute('Drop table sensor_data')
  cur.close()
  conn.close()

OperationalError: ignored