# Transform Index Data

In [None]:
# Fill NaN with 0
import pandas as pd

csv_file_path = 'D:/01 NUS/QF5214 Data Engineering/TeamOne/index_data.csv'
df = pd.read_csv(csv_file_path)

df.fillna(0, inplace=True)

df.to_csv(csv_file_path, index=False)

In [8]:
import os
import pandas as pd
import psycopg2
from psycopg2 import sql
from io import StringIO

# Database connection parameters
database = "QF5214"
username = "postgres"
password = "qf5214"
host = "134.122.167.14"
port = 5555

# CSV file path
csv_file_path = 'D:/01 NUS/QF5214 Data Engineering/TeamOne/index_data.csv'

# Extract schema and table name from CSV file name
schema_name = "datacollection"
table_name = os.path.splitext(os.path.basename(csv_file_path))[0]

# Read CSV file into a DataFrame
df = pd.read_csv(csv_file_path)

# Define columns that should be stored as numerical types
numeric_columns = {
    "Open": "DOUBLE PRECISION",
    "High": "DOUBLE PRECISION",
    "Low": "DOUBLE PRECISION",
    "Close": "DOUBLE PRECISION",
    "Adj_Close": "DOUBLE PRECISION",
    "Volume": "DOUBLE PRECISION",
}

# Define columns that should be stored as datetime
datetime_columns = {
    "Date": "DATE" 
}

# Convert necessary columns to appropriate types in pandas
for col in numeric_columns.keys():
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce')  # Convert to numeric, setting invalid values as NaN

for col in datetime_columns.keys():
    if col in df.columns:
        df[col] = pd.to_datetime(df[col], errors='coerce')  # Convert to datetime, setting invalid values as NaT

# Connect to the PostgreSQL database
conn = psycopg2.connect(
    dbname=database, 
    user=username, 
    password=password, 
    host=host, 
    port=port
)
cursor = conn.cursor()

# Ensure the schema exists
cursor.execute(sql.SQL("SET search_path TO {};").format(sql.Identifier(schema_name)))

# Check if the table exists
cursor.execute(sql.SQL("""
    SELECT EXISTS (
        SELECT FROM information_schema.tables 
        WHERE table_schema = %s AND table_name = %s
    );
"""), (schema_name, table_name))

table_exists = cursor.fetchone()[0]

if not table_exists:
    # Create the table dynamically with appropriate data types
    column_definitions = []
    for col in df.columns:
        if col in numeric_columns:
            col_type = numeric_columns[col]  # Assign predefined numeric type
        elif col in datetime_columns:
            col_type = datetime_columns[col]  # Assign datetime type
        else:
            col_type = "TEXT"  # Default type is TEXT for non-numeric columns
        column_definitions.append(sql.SQL("{} {}").format(sql.Identifier(col), sql.SQL(col_type)))

    create_table_query = sql.SQL("CREATE TABLE {}.{} ({});").format(
        sql.Identifier(schema_name),
        sql.Identifier(table_name),
        sql.SQL(', ').join(column_definitions)
    )
    cursor.execute(create_table_query)

# Use COPY to insert data in bulk
csv_buffer = StringIO()
df.to_csv(csv_buffer, index=False, header=False, date_format='%Y-%m-%d %H:%M:%S')  # 确保日期格式正确
csv_buffer.seek(0)

# Use COPY to insert the data from the StringIO buffer
cursor.copy_expert(
    sql.SQL("COPY {}.{} FROM STDIN WITH CSV NULL 'NaN'").format(
        sql.Identifier(schema_name),
        sql.Identifier(table_name)
    ),
    csv_buffer
)

# Commit changes and close the connection
conn.commit()
cursor.close()
conn.close()

print(f"Data successfully written into {schema_name}.{table_name}")


  df[col] = pd.to_datetime(df[col], errors='coerce')  # Convert to datetime, setting invalid values as NaT


Data successfully written into datacollection.index_data


# Transform Stock Data

In [None]:
# Fill NaN with 0
import pandas as pd

csv_file_path = 'D:/01 NUS/QF5214 Data Engineering/TeamOne/stock_data.csv'
df = pd.read_csv(csv_file_path)

df.fillna(0, inplace=True)

df.to_csv(csv_file_path, index=False)

In [9]:
import os
import pandas as pd
import psycopg2
from psycopg2 import sql
from io import StringIO

# Database connection parameters
database = "QF5214"
username = "postgres"
password = "qf5214"
host = "134.122.167.14"
port = 5555

# CSV file path
csv_file_path = 'D:/01 NUS/QF5214 Data Engineering/TeamOne/stock_data.csv'

# Extract schema and table name from CSV file name
schema_name = "datacollection"
table_name = os.path.splitext(os.path.basename(csv_file_path))[0]

# Read CSV file into a DataFrame
df = pd.read_csv(csv_file_path)

# Define columns that should be stored as numerical types
numeric_columns = {
    "Open": "DOUBLE PRECISION",
    "High": "DOUBLE PRECISION",
    "Low": "DOUBLE PRECISION",
    "Close": "DOUBLE PRECISION",
    "Adj_Close": "DOUBLE PRECISION",
    "Volume": "DOUBLE PRECISION",
    "PE": "DOUBLE PRECISION",
    "PB": "DOUBLE PRECISION",
    "PS": "DOUBLE PRECISION",
    "ROE": "DOUBLE PRECISION",
    "PM": "DOUBLE PRECISION",
    "IN": "DOUBLE PRECISION",
    "Market_Cap": "DOUBLE PRECISION"
}

# Define columns that should be stored as datetime
datetime_columns = {
    "Date": "DATE" 
}

# Convert necessary columns to appropriate types in pandas
for col in numeric_columns.keys():
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce')  # Convert to numeric, setting invalid values as NaN

for col in datetime_columns.keys():
    if col in df.columns:
        df[col] = pd.to_datetime(df[col], errors='coerce')  # Convert to datetime, setting invalid values as NaT

# Connect to the PostgreSQL database
conn = psycopg2.connect(
    dbname=database, 
    user=username, 
    password=password, 
    host=host, 
    port=port
)
cursor = conn.cursor()

# Ensure the schema exists
cursor.execute(sql.SQL("SET search_path TO {};").format(sql.Identifier(schema_name)))

# Check if the table exists
cursor.execute(sql.SQL("""
    SELECT EXISTS (
        SELECT FROM information_schema.tables 
        WHERE table_schema = %s AND table_name = %s
    );
"""), (schema_name, table_name))

table_exists = cursor.fetchone()[0]

if not table_exists:
    # Create the table dynamically with appropriate data types
    column_definitions = []
    for col in df.columns:
        if col in numeric_columns:
            col_type = numeric_columns[col]  # Assign predefined numeric type
        elif col in datetime_columns:
            col_type = datetime_columns[col]  # Assign datetime type
        else:
            col_type = "TEXT"  # Default type is TEXT for non-numeric columns
        column_definitions.append(sql.SQL("{} {}").format(sql.Identifier(col), sql.SQL(col_type)))

    create_table_query = sql.SQL("CREATE TABLE {}.{} ({});").format(
        sql.Identifier(schema_name),
        sql.Identifier(table_name),
        sql.SQL(', ').join(column_definitions)
    )
    cursor.execute(create_table_query)

# Use COPY to insert data in bulk
csv_buffer = StringIO()
df.to_csv(csv_buffer, index=False, header=False, date_format='%Y-%m-%d %H:%M:%S')  # 确保日期格式正确
csv_buffer.seek(0)

# Use COPY to insert the data from the StringIO buffer
cursor.copy_expert(
    sql.SQL("COPY {}.{} FROM STDIN WITH CSV NULL 'NaN'").format(
        sql.Identifier(schema_name),
        sql.Identifier(table_name)
    ),
    csv_buffer
)

# Commit changes and close the connection
conn.commit()
cursor.close()
conn.close()

print(f"Data successfully written into {schema_name}.{table_name}")

  df[col] = pd.to_datetime(df[col], errors='coerce')  # Convert to datetime, setting invalid values as NaT


Data successfully written into datacollection.stock_data
