In [None]:
!pip install redis

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!pip install psycopg2

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import pandas as pd
import psycopg2
import redis
from io import BytesIO

In [None]:

r = redis.Redis(
  host='redis-10441.c301.ap-south-1-1.ec2.cloud.redislabs.com',
  port=10441,
  password='QAfDAHOgHsGDaqZwFrXxyFqozH94Htj1')

In [None]:
# Set a key-value pair in Redis 
r.set('key', 'value') 

# Retrieve the value from the key 
value = r.get('key') 

print("The value stored in Redis for key 'key' is: %s" % value)

The value stored in Redis for key 'key' is: b'value'


In [None]:
def extract_data(filename):
    # check if the data is cached in Redis
    if r.exists('customer_call_logs'):
        return pd.read_pickle(r.get('customer_call_logs'))
    # extract data from CSV file
    df = pd.read_csv(filename)
    # cache data in Redis
    with BytesIO() as f:
        df.to_pickle(f)
        r.set('customer_call_logs', f.getvalue())
    return df

In [None]:
def transform_data(df):
    # clean data
    df = df.dropna()
    # restructure data
    df['call_duration'] = df['call_duration'].apply(lambda x: int(x.split(':')[0]) * 60 + int(x.split(':')[1]))
    df['call_destination'] = df['call_destination'].apply(lambda x: '+1' + x.replace('-', ''))
    # format data
    df['call_date'] = pd.to_datetime(df['call_date'], format='%d-%m-%Y')
    return df

In [None]:
!curl ipecho.net/plain

35.185.116.123

In [None]:
# Postgres Database Information
Postgres_host = '34.134.47.105'
Postgres_database = 'Redis_db'
Postgres_user = 'student3'
Postgres_password = 'Test12'

In [None]:
def load_data(df, dbname, user, password, host):
    # connect to database
    conn = psycopg2.connect(dbname=Postgres_database, user=Postgres_user, password=Postgres_password, host=Postgres_host)
    # create table
    cur = conn.cursor()
    cur.execute('''
        CREATE TABLE IF NOT EXISTS customer_call_logs
        (
            call_date DATE,
            call_duration INTEGER,
            call_destination VARCHAR(20),
            call_cost FLOAT
        );
    ''')
    conn.commit()
    # load data into table
    for row in df.itertuples(index=False):
        cur.execute('''
            INSERT INTO customer_call_logs (call_date, call_duration, call_destination, call_cost)
            VALUES (%s, %s, %s, %s)
        ''', row)
    conn.commit()
    cur.close()
    conn.close()


In [None]:
def data_pipeline():
  with open('customer_call_logs.csv', 'rb') as f:
    filename = 'customer_call_logs.csv'
    extracted_data = extract_data(BytesIO(f.read()))
    transformed_data = transform_data(extracted_data)
    load_data(transformed_data, Postgres_database, Postgres_user, Postgres_password, Postgres_host)
    


In [None]:
if __name__ == '__main__':
    data_pipeline()

**Documentation for Redis-Python Pipeline**

This document outlines the implementation of a Redis-Python pipeline for extracting, transforming, and loading (ETL) data into a PostgreSQL database. The pipeline is designed to read call logs from a CSV file, clean and format the data, cache it in Redis, and finally load it into a PostgreSQL database.

**Best Practices**

a) I recommend the use of Redis for caching to speed up the pipeline by reducing the time it takes to extract data from the CSV file and also ensure that data is available even when the pipeline is re-run.

b) Its also  also good to use BytesIO for pickling.To cache data in Redis, the data is first pickled and then stored in Redis. The BytesIO module is used to store the pickled data in memory, which improves performance and reduces memory usage.

c) Error handling through data transformation. The pipeline uses error handling to ensure that the data is clean and accurate. It checks for missing values, invalid data types, and ensures that data is in the correct format before loading it into PostgreSQL.

d) Storing data in a database eg PostgreSQL is a good practice. PostgreSQL is used as the final data storage layer. It provides great data storage capabilities and ensures data integrity, reliability, and security.


**Recommendations for Deployment**

a) Use of Cloud-based Infrastructure to deploy the pipeline such as AWS or Google Cloud Platform is recommended. This provides scalability, flexibility, and cost-effectiveness.

b) To secure sensitive information such as database credentials and API keys, a secrets management tool such as Vault or AWS Secrets Manager is recommended.

d) To ensure the pipeline is running smoothly, logging and monitoring tools such as ELK Stack or Prometheus are recommended. These tools provide visibility into the pipeline and allow for easy debugging in case of errors.

**Conclusion**

In conclusion, the Redis-Python pipeline is an efficient and effective way of extracting, transforming, and loading data into a PostgreSQL database. By following best practices and deploying the pipeline on a cloud-based infrastructure, the pipeline can be easily managed, scaled, and secured.