<a href="https://colab.research.google.com/github/WKhisa/Data-Pipelines-with-Python-Project/blob/main/Data_Pipelines_with_Python_and_PostgreSQL_IPP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Pipeline to extract, clean and load data to PostgreSQL database**

In [2]:
#Load Prerequisites
import pandas as pd
import psycopg2
from sqlalchemy import create_engine

# REad Data

In [3]:
#Load Raw Data
equipment_raw_df= pd.read_csv('equipment_sensor.csv')
maintenance_raw_df= pd.read_csv('maintenance_records.csv')
network_raw_df= pd.read_csv('network_sensor.csv')

# Review Clean and standardize Data

Equipment Data

In [4]:
#Preview equipment
equipment_raw_df.head()


Unnamed: 0,ID,date,time,sensor_reading
0,1,2022-03-01,08:00:00,26.7
1,1,2022-03-01,08:15:00,28.4
2,1,2022-03-01,08:30:00,27.8
3,2,2022-03-01,08:00:00,99.1
4,2,2022-03-01,08:15:00,97.5


In [5]:
equipment_raw_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 4 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   ID              6 non-null      int64  
 1   date            6 non-null      object 
 2   time            6 non-null      object 
 3   sensor_reading  6 non-null      float64
dtypes: float64(1), int64(1), object(2)
memory usage: 320.0+ bytes


In [6]:
equipment_raw_df['date'] = pd.to_datetime(equipment_raw_df['date'])
equipment_raw_df['time'] = pd.to_datetime(equipment_raw_df['time'], format='%H:%M:%S').dt.time
equipment_raw_df['timestamp'] = pd.to_datetime(equipment_raw_df['date'].astype(str) + ' ' + equipment_raw_df['time'].astype(str))
equipment_raw_df = equipment_raw_df.drop(['date', 'time'], axis=1)
equipment_raw_df.head()

Unnamed: 0,ID,sensor_reading,timestamp
0,1,26.7,2022-03-01 08:00:00
1,1,28.4,2022-03-01 08:15:00
2,1,27.8,2022-03-01 08:30:00
3,2,99.1,2022-03-01 08:00:00
4,2,97.5,2022-03-01 08:15:00


In [26]:
equipment_raw_df.rename(columns = {'sensor_reading':'equip_sensor_reading'}, inplace = True)
equipment_raw_df

Unnamed: 0,ID,equip_sensor_reading,timestamp
0,1,26.7,2022-03-01 08:00:00
1,1,28.4,2022-03-01 08:15:00
2,1,27.8,2022-03-01 08:30:00
3,2,99.1,2022-03-01 08:00:00
4,2,97.5,2022-03-01 08:15:00
5,2,98.2,2022-03-01 08:30:00


Maintenance Data

In [7]:
maintenance_raw_df.head()


Unnamed: 0,ID,date,time,equipment_ID,maintenance_type
0,1,2022-03-01,10:00:00,1,Preventive Maintenance
1,2,2022-03-02,14:30:00,2,Corrective Maintenance
2,3,2022-03-03,08:00:00,1,Corrective Maintenance


In [8]:
maintenance_raw_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 5 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   ID                3 non-null      int64 
 1   date              3 non-null      object
 2   time              3 non-null      object
 3   equipment_ID      3 non-null      int64 
 4   maintenance_type  3 non-null      object
dtypes: int64(2), object(3)
memory usage: 248.0+ bytes


In [9]:
maintenance_raw_df['date'] = pd.to_datetime(maintenance_raw_df['date'])
maintenance_raw_df['time'] = pd.to_datetime(maintenance_raw_df['time'], format='%H:%M:%S').dt.time
maintenance_raw_df['timestamp'] = pd.to_datetime(maintenance_raw_df['date'].astype(str) + ' ' + maintenance_raw_df['time'].astype(str))
maintenance_raw_df = maintenance_raw_df.drop(['date', 'time'], axis=1)
maintenance_raw_df.head()

Unnamed: 0,ID,equipment_ID,maintenance_type,timestamp
0,1,1,Preventive Maintenance,2022-03-01 10:00:00
1,2,2,Corrective Maintenance,2022-03-02 14:30:00
2,3,1,Corrective Maintenance,2022-03-03 08:00:00


In [10]:
maintenance_raw_df.drop(columns= ['ID'], inplace = True)


In [37]:
maintenance_raw_df.rename(columns = {'equipment_ID':'ID'}, inplace = True)
maintenance_raw_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 3 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   ID                3 non-null      int64         
 1   maintenance_type  3 non-null      object        
 2   timestamp         3 non-null      datetime64[ns]
dtypes: datetime64[ns](1), int64(1), object(1)
memory usage: 200.0+ bytes


Network Data

In [12]:
network_raw_df.head()

Unnamed: 0,ID,date,time,sensor_reading
0,1,2022-03-01,08:00:00,0.58
1,1,2022-03-01,08:15:00,0.62
2,1,2022-03-01,08:30:00,0.6
3,2,2022-03-01,08:00:00,0.89
4,2,2022-03-01,08:15:00,0.85


In [13]:
network_raw_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 4 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   ID              6 non-null      int64  
 1   date            6 non-null      object 
 2   time            6 non-null      object 
 3   sensor_reading  6 non-null      float64
dtypes: float64(1), int64(1), object(2)
memory usage: 320.0+ bytes


In [14]:
network_raw_df['date'] = pd.to_datetime(network_raw_df['date'])
network_raw_df['time'] = pd.to_datetime(network_raw_df['time'], format='%H:%M:%S').dt.time
network_raw_df['timestamp'] = pd.to_datetime(network_raw_df['date'].astype(str) + ' ' + network_raw_df['time'].astype(str))
network_raw_df = network_raw_df.drop(['date', 'time'], axis=1)
network_raw_df.head()

Unnamed: 0,ID,sensor_reading,timestamp
0,1,0.58,2022-03-01 08:00:00
1,1,0.62,2022-03-01 08:15:00
2,1,0.6,2022-03-01 08:30:00
3,2,0.89,2022-03-01 08:00:00
4,2,0.85,2022-03-01 08:15:00


In [27]:
network_raw_df.rename(columns = {'sensor_reading':'network_sensor_reading'}, inplace = True)
network_raw_df

Unnamed: 0,ID,network_sensor_reading,timestamp
0,1,0.58,2022-03-01 08:00:00
1,1,0.62,2022-03-01 08:15:00
2,1,0.6,2022-03-01 08:30:00
3,2,0.89,2022-03-01 08:00:00
4,2,0.85,2022-03-01 08:15:00
5,2,0.88,2022-03-01 08:30:00


Merge the data sets

In [45]:
# Merge the three DataFrames
merged_df = equipment_raw_df.merge(maintenance_raw_df, on=['ID', 'timestamp'], how='left')
merged_df = merged_df.merge(network_raw_df, on=['ID', 'timestamp'], how='left')
merged_df

Unnamed: 0,ID,equip_sensor_reading,timestamp,maintenance_type,network_sensor_reading
0,1,26.7,2022-03-01 08:00:00,,0.58
1,1,28.4,2022-03-01 08:15:00,,0.62
2,1,27.8,2022-03-01 08:30:00,,0.6
3,2,99.1,2022-03-01 08:00:00,,0.89
4,2,97.5,2022-03-01 08:15:00,,0.85
5,2,98.2,2022-03-01 08:30:00,,0.88


In [46]:
merged_df.columns

Index(['ID', 'equip_sensor_reading', 'timestamp', 'maintenance_type',
       'network_sensor_reading'],
      dtype='object')

# Load to Database

In [48]:
host="postgres://gzdrgzij:HV9rNO543X3jJWg6LH9iFwgYhDyeV9MQ@trumpet.db.elephantsql.com/gzdrgzij"
port=5432
dbname="gzdrgzij"
user="gzdrgzij"
password="HV9rNO543X3jJWg6LH9iFwgYhDyeV9MQ"

# Establish connections
conn_string = f"postgresql://{user}:{password}@{host}/{dbname}"
db = create_engine(conn_string)
conn = db.raw_connection()
conn.autocommit = True

# create a cursor object
cur = conn.cursor()

create_table_query = """
CREATE TABLE IF NOT EXISTS equipment_records (
    id SERIAL PRIMARY KEY,
    equipment_id VARCHAR(64),
    sensor_date_time TIMESTAMP,
    equipment_sensor_reading NUMERIC(10,4),
    network_sensor_reading NUMERIC(10,4),
    maintenance_date_time TIMESTAMP,
    maintenance_type VARCHAR(255)
);
"""

try:
  cur.execute(create_table_query)
  conn.commit()
except:
  print("Table Creation failed!")
finally:
  merged_df.to_sql('sensor_data', db, if_exists='append', index=False)
  cur.execute("SELECT * from equipment_records")
  print(cur.fetchone())
  # cur.execute('Drop table sensor_data')
  cur.close()
  conn.close()

ValueError: ignored

In [43]:
# Database connection details
db_user = 'gzdrgzij'
db_password = 'HV9rNO543X3jJWg6LH9iFwgYhDyeV9MQ'
db_host = 'postgres://gzdrgzij:HV9rNO543X3jJWg6LH9iFwgYhDyeV9MQ@trumpet.db.elephantsql.com/gzdrgzij'
db_port = '5432'  # Default PostgreSQL port
db_name = 'gzdrgzij'

# Create a connection to the PostgreSQL database
engine = create_engine(f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')

# Save the DataFrame to the database table
table_name = 'your_table_name'
merged_df.to_sql(table_name, engine, if_exists='replace', index=False)

ValueError: ignored