## Import Packages and Libraries

In [2]:
import pandas as pd
import boto3
from io import StringIO 
import psycopg2
from dotenv import load_dotenv 
import os

# Getting the Data

- Getting data from: [Cycling Data](https://cycling.data.tfl.gov.uk/)

In [4]:
url = 'https://cycling.data.tfl.gov.uk/usage-stats/384JourneyDataExtract15Nov2023-30Nov2023.csv'
storage_options = {'User-Agent': 'Mozilla/5.0'}
df = pd.read_csv(url, storage_options=storage_options)
df.head()

Unnamed: 0,Number,Start date,Start station number,Start station,End date,End station number,End station,Bike number,Bike model,Total duration,Total duration (ms)
0,135967971,2023-11-30 23:59,970,"Scala Street, Fitzrovia",2023-12-01 00:00,2691,"Howland Street, Fitzrovia",52164,CLASSIC,1m 47s,107899
1,135967972,2023-11-30 23:59,3478,"Guildhouse Street, Victoria",2023-12-01 00:06,1165,"Riverlight North, Nine Elms",60462,PBSC_EBIKE,7m 1s,421332
2,135967973,2023-11-30 23:59,2660,"Frith Street, Soho",2023-12-01 00:18,200151,"Hoxton Station, Hoxton",58804,CLASSIC,18m 59s,1139805
3,135967974,2023-11-30 23:59,3458,"Waterloo Place, St. James's",2023-12-01 00:08,3465,"Eccleston Place, Victoria",20077,CLASSIC,8m 17s,497254
4,135967975,2023-11-30 23:59,1020,"Leonard Circus , Shoreditch",2023-12-01 00:12,22173,"Old Ford Road, Bethnal Green",53834,CLASSIC,12m 23s,743051


# Define functions and constants

## constants

In [5]:
load_dotenv()

s3 = boto3.resource('s3')
bucket_name = 'bicycle-rental-london'
file_name = 'data.csv'
region_name = 'us-east-1'
cluster_params = {
    'ClusterIdentifier': 'bicycle-london-cluster',
    'NodeType': 'dc2.large',
    'MasterUsername': os.environ['MASTER_USER_NAME'],
    'MasterUserPassword': os.environ['MasterUserPassword'],
    'DBName': os.environ['DB_NAME'],
    'ClusterType': 'single-node',
    'PubliclyAccessible': True,    
}

table_name = 'bike_rental_data'


## Functions

In [6]:
def create_bucket(bucket_name):
    response = s3.create_bucket(
        Bucket=bucket_name,
    )
    print("Bucket created:", bucket_name)
    return response

In [24]:
def delete_bucket(bucket_name : str, file_name : str):
    client = boto3.client('s3')
    client.delete_object(Bucket = bucket_name, Key = file_name)
    response = client.delete_bucket(
        Bucket=bucket_name
    )
    print("Bucket deleted:", bucket_name)
    return response

# Put dataframe inside data lake (S3 Bucket)

## Creating bucket

In [16]:
create_bucket(bucket_name=bucket_name)

Bucket created: bycle-rental-london


s3.Bucket(name='bycle-rental-london')

In [71]:
# Check if it was created
for bucket in s3.buckets.all():
    print(bucket.name)

bycle-rental-london


## Put the data

In [77]:
df.drop('Unnamed: 0', axis= 1, inplace=True) # removing duplicate index
df.index.name = 'Index'
df

Unnamed: 0_level_0,Number,Start date,Start station number,Start station,End date,End station number,End station,Bike number,Bike model,Total duration,Total duration (ms)
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
0,135292553,2023-10-31 23:59,1009,"Taviton Street, Bloomsbury",2023-11-01 00:20,1153,"Pall Mall East, West End",54429,CLASSIC,20m 24s,1224353
1,135292554,2023-11-01 00:00,300012,"Irene Road, Parsons Green",2023-11-01 00:06,300058,"The Vale, Chelsea",60442,PBSC_EBIKE,6m 18s,378747
2,135292549,2023-10-31 23:58,300080,"Culvert Road, Battersea",2023-11-01 00:09,1135,"Claverton Street, Pimlico",53480,CLASSIC,10m 20s,620273
3,135292550,2023-10-31 23:58,300249,"Westminster Pier, Westminster",2023-11-01 00:03,1219,"Lower Marsh, Waterloo",20962,CLASSIC,4m 4s,244652
4,135292551,2023-10-31 23:59,1228,"Southampton Street, Strand",2023-11-01 00:17,200195,"St Martins Close, Camden Town",57448,CLASSIC,18m 29s,1109695
...,...,...,...,...,...,...,...,...,...,...,...
383799,134901998,2023-10-15 00:00,3506,"Stanhope Gate, Mayfair",2023-10-15 00:04,1230,"Millennium Hotel, Mayfair",55433,CLASSIC,3m 54s,234020
383800,134901999,2023-10-15 00:00,3500,"Baldwin Street, St. Luke's",2023-10-15 00:32,200156,"Pott Street, Bethnal Green",60528,PBSC_EBIKE,31m 39s,1899342
383801,134902000,2023-10-15 00:00,200081,"Coomer Place, West Kensington",2023-10-15 00:15,200197,"Barons Court Station, West Kensington",54329,CLASSIC,14m 13s,853745
383802,134902001,2023-10-15 00:01,3486,"St. Luke's Church, Chelsea",2023-10-15 00:12,1195,"St. George's Square, Pimlico",57259,CLASSIC,11m 58s,718906


In [78]:
csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3.Object(bucket_name, file_name).put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'VMHXHA9ECQWR58JD',
  'HostId': 'v57v6cMTAU96vOsgGGiSujwAWrKZZdP97QVIo8uXkGuyZs4mRNY9+ak74gaB4sDzXU9CJbYzDhM=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'v57v6cMTAU96vOsgGGiSujwAWrKZZdP97QVIo8uXkGuyZs4mRNY9+ak74gaB4sDzXU9CJbYzDhM=',
   'x-amz-request-id': 'VMHXHA9ECQWR58JD',
   'date': 'Sat, 13 Apr 2024 16:44:23 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"ef598ce6797a6894f801501b743698f7"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"ef598ce6797a6894f801501b743698f7"',
 'ServerSideEncryption': 'AES256'}

## Read the data from the Data Lake

In [80]:
response = s3.Object(bucket_name, file_name).get()
data = response['Body'].read().decode('utf-8')
df = pd.read_csv(StringIO(data))
df.set_index('Index', inplace=True)
df.head()

Unnamed: 0_level_0,Number,Start date,Start station number,Start station,End date,End station number,End station,Bike number,Bike model,Total duration,Total duration (ms)
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
0,135292553,2023-10-31 23:59,1009,"Taviton Street, Bloomsbury",2023-11-01 00:20,1153,"Pall Mall East, West End",54429,CLASSIC,20m 24s,1224353
1,135292554,2023-11-01 00:00,300012,"Irene Road, Parsons Green",2023-11-01 00:06,300058,"The Vale, Chelsea",60442,PBSC_EBIKE,6m 18s,378747
2,135292549,2023-10-31 23:58,300080,"Culvert Road, Battersea",2023-11-01 00:09,1135,"Claverton Street, Pimlico",53480,CLASSIC,10m 20s,620273
3,135292550,2023-10-31 23:58,300249,"Westminster Pier, Westminster",2023-11-01 00:03,1219,"Lower Marsh, Waterloo",20962,CLASSIC,4m 4s,244652
4,135292551,2023-10-31 23:59,1228,"Southampton Street, Strand",2023-11-01 00:17,200195,"St Martins Close, Camden Town",57448,CLASSIC,18m 29s,1109695


# Moving the data from the Data Lake to Data Warehouse

## Creating Data Warehouse

In [81]:
redshift = boto3.client('redshift',  region_name=region_name)
response = redshift.create_cluster(**cluster_params)
print("Cluster created:", response['Cluster']['ClusterIdentifier'])

Cluster created: bycle-london-cluster


In [5]:
cluster_id = response['Cluster']['ClusterIdentifier']
create_waiter = redshift.get_waiter("cluster_available")
create_waiter.wait(
        ClusterIdentifier = cluster_id,
        WaiterConfig = {
            "Delay": 30,
            "MaxAttempts": 20
        }
    ) # Wait for Creating of cluster

cluster_details = redshift.describe_clusters(ClusterIdentifier=cluster_id)

In [6]:
endpoint = cluster_details['Clusters'][0]['Endpoint']
port = endpoint['Port']
host = endpoint['Address']
username = cluster_params['MasterUsername']
password = cluster_params['MasterUserPassword']
database = cluster_params['DBName']

## Put this information into the DBT
print("\nConnection Settings:")
print("Host:", host)
print("Port:", port)
print("Database:", database)
#print("Username:", username)
#print("Password:", password)


Connection Settings:
Host: bycle-london-cluster.c16o5fuq8xj9.us-east-1.redshift.amazonaws.com
Port: 5439
Database: dev


In [103]:
# Permit inbound traffic to the dataset
security_group_id = cluster_details['Clusters'][0]['VpcSecurityGroups'][0]['VpcSecurityGroupId']
try:
    ec2 = boto3.client('ec2', region_name=region_name)
    ec2.authorize_security_group_ingress(
        GroupId=security_group_id,
        IpProtocol='TCP',
        FromPort=5439,
        ToPort=5439,
        CidrIp='0.0.0.0/0'
    )
    print("Inbound traffic on port 5439 is now allowed from any IP address.")
except Exception as e:
    print("An error occurred while authorizing ingress:", e)

Inbound traffic on port 5439 is now allowed from any IP address.


In [97]:
try:
    conn = psycopg2.connect(
        host=host, 
        port=port, 
        database=database, 
        user = username, 
        password = password
    )
    print("Connected to Redshift!")
except Exception as e:
    print("Unable to connect to Redshift:", e)

Connected to Redshift!


## Moving the data

In [100]:
## Deprecated mode to pass the data. It takes arounds 19 hours to complete
if False:
    tqdm.pandas()

    cur = conn.cursor()

    create_table_query = """
    CREATE TABLE IF NOT EXISTS bike_rental_data (
        "Index" INT,
        "Number" INT,
        "Start_date" TIMESTAMP,
        "Start_station_number" INT,
        "Start_station" VARCHAR(255),
        "End_date" TIMESTAMP,
        "End_station_number" INT,
        "End_station" VARCHAR(255),
        "Bike_number" INT,
        "Bike_model" VARCHAR(255),
        "Total_duration" VARCHAR(255),
        "Total_duration_ms" INT
    );
    """

    cur.execute(create_table_query)

    def insert_row(cur, row):
        insert_query = """
        INSERT INTO bike_rental_data ("index", "Number", "Start_date", "Start_station_number", 
        "Start_station", "End_date", "End_station_number", "End_station", "Bike_number", 
        "Bike_model", "Total_duration", "Total_duration_ms") 
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
        """
        cur.execute(insert_query, (row.name, row['Number'], row['Start date'], 
                                row['Start station number'], row['Start station'], 
                                row['End date'], row['End station number'], 
                                row['End station'], row['Bike number'], 
                                row['Bike model'], row['Total duration'], 
                                row['Total duration (ms)']))

    df.progress_apply(lambda row: insert_row(cur, row), axis=1)

    conn.commit()
    cur.close()
    conn.close()

In [37]:
import boto3
session = boto3.Session()
credentials = session.get_credentials()

<botocore.credentials.Credentials object at 0x0000021D5AF86950>


In [None]:
cur = conn.cursor()

sql_query = f"""DROP TABLE IF EXISTS {table_name} CASCADE"""
cur.execute(sql_query)
conn.commit()

create_table_query = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
    "index" INT,
    "Number" INT,
    "Start_date" TIMESTAMP,
    "Start_station_number" INT,
    "Start_station" VARCHAR(255),
    "End_date" TIMESTAMP,
    "End_station_number" INT,
    "End_station" VARCHAR(255),
    "Bike_number" INT,
    "Bike_model" VARCHAR(255),
    "Total_duration" VARCHAR(255),
    "Total_duration_ms" INT
);
"""
cur.execute(create_table_query)
conn.commit()

file_path = f"s3://{bucket_name}/{file_name}"

# Copy data
copy_query = f"""
COPY {table_name}
FROM '{file_path}'
CREDENTIALS 'aws_access_key_id={credentials.access_key};aws_secret_access_key={credentials.secret_key}'
REGION 'us-west-2'
IGNOREHEADER 1
CSV;
"""

cur.execute(copy_query)
conn.commit()
conn.close()

## Data Analytics

In [130]:
df.columns

Index(['Unnamed: 0', 'Number', 'Start date', 'Start station number',
       'Start station', 'End date', 'End station number', 'End station',
       'Bike number', 'Bike model', 'Total duration', 'Total duration (ms)'],
      dtype='object')

In [137]:
len(pd.unique(df['Start station number']))

798

In [138]:
len(pd.unique(df['End station number']))

800

In [136]:
len(pd.unique(df['Bike number']))

11787

In [142]:
print(f'Average total duration: {(df["Total duration (ms)"].mean() / 60000):.2f} minutes') 

Average total duration: 20.75 minutes


In [145]:
print(f'Maximum total duration: {(df["Total duration (ms)"].max() / (60000* 60 * 24)):.2f} days') 

Maximum total duration: 13.24 days


In [153]:
len(pd.unique(df['Start station'].apply(lambda x : x.split(',')[1].strip())))

124

In [160]:
len(pd.unique(df['End station'].apply(lambda x :x.split(',')[1].strip()  if len(x.split(',')) > 1 else None)))

125

In [166]:
pd.unique(df['Start date'].apply(lambda x: x.split(' ')[0]))

array(['2023-10-31', '2023-11-01', '2023-10-30', '2023-10-29',
       '2023-10-28', '2023-10-27', '2023-10-26', '2023-10-25',
       '2023-10-24', '2023-10-23', '2023-10-22', '2023-10-21',
       '2023-10-20', '2023-10-19', '2023-10-18', '2023-10-17',
       '2023-10-16', '2023-10-15'], dtype=object)

In [167]:
pd.unique(df['End date'].apply(lambda x: x.split(' ')[0]))

array(['2023-11-01', '2023-10-31', '2023-11-05', '2023-11-02',
       '2023-11-03', '2023-11-07', '2023-10-30', '2023-11-06',
       '2023-11-04', '2023-10-29', '2023-11-10', '2023-11-08',
       '2023-10-28', '2023-10-27', '2023-10-26', '2023-10-25',
       '2023-10-24', '2023-10-23', '2023-10-22', '2023-10-21',
       '2023-10-20', '2023-10-19', '2023-10-18', '2023-10-17',
       '2023-10-16', '2023-10-15'], dtype=object)

## Deleting Everything from the cloud
After you make your analysis, it's better to remove everything from the cloud

In [9]:
is_analysis_done = False

In [27]:
if is_analysis_done:
    delete_bucket(bucket_name, file_name)
    redshift.delete_cluster(ClusterIdentifier = cluster_id, SkipFinalClusterSnapshot  = True)

{'Cluster': {'ClusterIdentifier': 'bycle-london-cluster',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'ClusterAvailabilityStatus': 'Modifying',
  'MasterUsername': 'admin',
  'DBName': 'dev',
  'Endpoint': {'Address': 'bycle-london-cluster.c16o5fuq8xj9.us-east-1.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2024, 4, 12, 14, 40, 25, 213000, tzinfo=tzutc()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ManualSnapshotRetentionPeriod': -1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-05c55aa288a5c1347',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-0b4150ff1a56ff2c0',
  'AvailabilityZone': 'us-east-1c',
  'PreferredMaintenanceWindow': 'wed:04:30-wed:05:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'Numb