In [1]:
# Install necessary packages
# Install the required libraries if not installed
!pip install pandas requests psycopg2 sqlalchemy

import pandas as pd
import requests
from io import StringIO
import psycopg2
from sqlalchemy import create_engine





[notice] A new release of pip available: 22.3.1 -> 25.0.1
[notice] To update, run: C:\Users\Abhi0618\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [None]:

# Step 1: Download the raw working file
url = "https://data-engineer-technical-challenge.s3.ap-southeast-2.amazonaws.com/pace-data.txt"
response = requests.get(url)
raw_text = response.text  # Get the content of the file as a string

# Step 2: Use StringIO to treat the string as if it were a file
data_io = StringIO(raw_text)

# Step 3: Read the CSV-formatted text using pandas
df = pd.read_csv(data_io)

# Step 4: Save the initial CSV locally
df.to_csv("pace-data.csv", index=False)
print("✅ CSV successfully saved as 'pace-data.csv'")

# Check the first few rows to confirm the data is loaded correctly
print(df.head(5))

# Step 5: Normalize MovementDateTime to ISO format
df['MovementDateTime'] = pd.to_datetime(df['MovementDateTime']).dt.strftime('%Y-%m-%dT%H:%M:%S')
print("Normalized MovementDateTime to ISO format")

# Preview the normalized data
print(df.head())

# Save the normalized data to CSV
df.to_csv("pace-data-normalized.csv", index=False)

# Step 6: Convert Speed column to numeric and handle any errors
df['Speed'] = pd.to_numeric(df['Speed'], errors='coerce')

# Step 7: Function to fill missing or zero speeds for ships under way using engine
def fill_missing_speeds(group):
    # Find rows where the MoveStatus is "Under way using engine"
    underway_mask = group['MoveStatus'] == 'Under way using engine'
    
    # Get the valid speeds (non-zero and non-null) for that group
    valid_speeds = group.loc[underway_mask & group['Speed'].notna() & (group['Speed'] > 0), 'Speed']
    
    # Calculate the average speed for valid speeds
    mean_speed = valid_speeds.mean()
    
    # Apply the mean speed to the rows where speed is missing or zero
    condition = underway_mask & ((group['Speed'].isna()) | (group['Speed'] == 0))
    group.loc[condition, 'Speed'] = mean_speed
    
    return group

# Step 8: Apply the function to each CallSign group to fill missing speeds
df = df.groupby('CallSign', group_keys=False).apply(fill_missing_speeds)

# Step 9: Calculate BeamRatio = Beam / Length
df['BeamRatio'] = df['Beam'] / df['Length']

# Replace any infinite or NaN values in BeamRatio
df['BeamRatio'] = df['BeamRatio'].replace([float('inf'), -float('inf')], pd.NA)

# Step 10: Save the enriched data as 'enriched.csv'
df.to_csv('enriched.csv', index=False)
print("✅ Enriched CSV saved as 'enriched.csv'")


In [None]:

# Step 11: Connect to PostgreSQL and insert data

# Set up the connection parameters
dbname = 'riotintochallengedb'  # Your database name
user = 'postgres'      # Default username for PostgreSQL
password = '0618'      # The password you set during installation
host = 'localhost'     # Host for local PostgreSQL server
port = '5432'          # Default port for PostgreSQL

# Establish the connection to PostgreSQL
conn = psycopg2.connect(
    dbname=dbname,
    user=user,
    password=password,
    host=host,
    port=port
)

# Create a cursor object to interact with the database
cursor = conn.cursor()

# Step 12: Create table (if not already created)
# Dynamically create the SQL query to create the table with appropriate columns
table_name = 'ship_data'  # The table name in PostgreSQL
columns = ', '.join([f'"{col}" TEXT' for col in df.columns])  # Define the columns as TEXT
create_table_query = f'''
CREATE TABLE IF NOT EXISTS {table_name} (
    {columns}
);
'''

# Execute the query to create the table
cursor.execute(create_table_query)
conn.commit()  # Commit the transaction

# Step 13: Insert data into the table using pandas `to_sql` method
# Using SQLAlchemy to handle the insertion of data
engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}')
df.to_sql(table_name, engine, if_exists='append', index=False)

# Step 14: Close the connection to PostgreSQL
cursor.close()
conn.close()

print("✅ Data has been inserted into PostgreSQL successfully.")
