# PSQL Crendentials and Connections

In [3]:
import pandas as pd
from sqlalchemy import create_engine

# Database connection parameters
db_config = {
    'host': 'database-1.cb0gsmcyc0ns.us-west-1.rds.amazonaws.com',
    'port': 5432,
    'dbname': 'fitbit_data',
    'user': 'postgres',
    'password': 'Pass1234!'
}

In [2]:
conn_str = f"postgresql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}"
engine = create_engine(conn_str)

# Creating the database

In [None]:
import psycopg2

# Connect to the default 'postgres' database to create a new one
conn = psycopg2.connect(
    dbname="postgres",
    user="postgres",
    password="Pass1234!",
    host="database-1.cb0gsmcyc0ns.us-west-1.rds.amazonaws.com",
    port="5432"
)
conn.autocommit = True  # Required to run CREATE DATABASE outside a transaction

cur = conn.cursor()

# Create a new database
db_name = "fitbit_data"
cur.execute(f"CREATE DATABASE {db_name};")

print(f"Database '{db_name}' created successfully!")

# Close the connection
cur.close()
conn.close()

# Creating MET and metric_explaination table

In [None]:
from sqlalchemy import text

# Execute SQL using text()
with engine.connect() as connection:
    connection.execute(text("""
    CREATE TABLE IF NOT EXISTS metric_explaination (
        metric_id SERIAL PRIMARY KEY,
        metric_name VARCHAR(100) NOT NULL,
        min_value FLOAT NOT NULL,
        max_value FLOAT NOT NULL,
        min_anamoly_desc VARCHAR(100) NOT NULL,
        max_anamoly_desc VARCHAR(100) NOT NULL
    );
    """))
    print("Table created successfully!")
    
    connection.commit()  


Table created successfully!


In [12]:
from sqlalchemy import create_engine, text

# Execute SQL using text()
with engine.connect() as connection:
    connection.execute(text("""
    CREATE TABLE IF NOT EXISTS met_data (
        met_min FLOAT NOT NULL,
        met_max FLOAT NOT NULL,
        likely_activity VARCHAR(100) NOT NULL
    );
    """))
    print("Table created successfully!")
    
    connection.commit()  

Table created successfully!


In [13]:
metrics = pd.read_csv("Metric Explanations - Sheet1.csv")

In [15]:
metrics.reset_index(inplace=True)

In [17]:
metrics.columns = ['metric_id', 'metric_name', 'min_value', 'max_value', 'min_anamoly_desc', 'max_anamoly_desc']

In [18]:
metrics.to_sql('metric_explaination', engine, index=False, if_exists='append')

5

In [21]:
met = pd.read_csv("Likely Activity - Sheet1.csv")
met.columns = ['met_min', 'met_max', 'likely_activity']
met.to_sql('met_data', engine, index=False, if_exists='append')

13

# Creating User Demographics and Daily Data table

In [18]:
from sqlalchemy import create_engine, text

# Execute SQL using text()
with engine.connect() as connection:
    connection.execute(text("""
    CREATE TABLE IF NOT EXISTS user_demographics (
        user_id         BIGINT PRIMARY KEY,   
        weight_kg       NUMERIC,
        weight_pounds   NUMERIC,
        bmi             NUMERIC,
        age             INT,
        smoker          BOOLEAN,
        drinker         BOOLEAN,
        updated_at      TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    """))
    print("Table created successfully!")
    
    connection.commit()  


Table created successfully!


In [None]:
with engine.connect() as connection:
    connection.execute(text("""
    CREATE TABLE IF NOT EXISTS daily_data (
            user_id             BIGINT NOT NULL,
            activity_date       DATE NOT NULL,
            total_steps         INTEGER,
            total_distance      NUMERIC,
            total_calories      INTEGER,
            total_sleep_minutes INTEGER,
            very_active_minutes INTEGER,
            fairly_active_minutes INTEGER,
            lightly_active_minutes INTEGER,
            sedentary_minutes   INTEGER,
            updated_at          TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            PRIMARY KEY (user_id, activity_date),
            FOREIGN KEY (user_id) REFERENCES user_demographics(user_id)
        );
    """))
    print("Table created successfully!")
    connection.commit()

Table created successfully!


In [None]:
from datetime import datetime
import numpy as np

def process_and_insert_user_data():
    # Load weight data from 3 CSVs and combine them
    dataset_paths = ['archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/weightLogInfo_merged.csv',
                     'archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/weightLogInfo_merged.csv',
                     'synthetic_data/synthetic_data/synthetic_weightLog.csv']
    weight_dfs = []
    for paths in dataset_paths:
        try:
            df = pd.read_csv(paths)  # Adjust filenames as needed
            weight_dfs.append(df)
        except FileNotFoundError:
            print(f"File {paths} not found. Skipping.")
    
    # Combine all weight dataframes
    if weight_dfs:
        combined_weight = pd.concat(weight_dfs, ignore_index=True)
        
        # Convert date to datetime and ensure ID is treated as int
        combined_weight['Date'] = pd.to_datetime(combined_weight['Date'])
        combined_weight['Id'] = combined_weight['Id'].astype(int)
        
        # Get the latest weight data for each user
        latest_weight = combined_weight.sort_values('Date').groupby('Id').last().reset_index()
        
        # Rename columns to match our database schema
        latest_weight = latest_weight.rename(columns={
            'Id': 'user_id',
            'WeightKg': 'weight_kg',
            'WeightPounds': 'weight_pounds',
            'BMI': 'bmi'
        })
    else:
        print("No weight data found.")
        latest_weight = pd.DataFrame(columns=['user_id', 'weight_kg', 'weight_pounds', 'bmi'])
    
    # Load demographics data
    try:
        demographics = pd.read_csv('C:/Users/Chira/OneDrive/Desktop/DSC 202/synthetic_demographic_data.csv')  # Adjust filename as needed
        demographics['Id'] = demographics['Id'].astype(int)
        
        # Rename columns to match our schema
        demographics = demographics.rename(columns={
            'Id': 'user_id',
            'Age': 'age', 
            'Smoking': 'smoker',
            'Drinking': 'drinker'
        })
        
        # # Convert smoking and drinking to boolean
        demographics['smoker'] = demographics['smoker'].map({1: True, 0: False})
        demographics['drinker'] = demographics['drinker'].map({1: True, 0: False})
    except FileNotFoundError:
        print("Demographics file not found.")
        demographics = pd.DataFrame(columns=['user_id', 'age', 'smoker', 'drinker'])
    
    # Merge weight and demographics data on user_id
    if not latest_weight.empty and not demographics.empty:
        # Outer join to include all users from both datasets
        merged_data = pd.merge(
            latest_weight[['user_id', 'weight_kg', 'weight_pounds', 'bmi']], 
            demographics[['user_id', 'age', 'smoker', 'drinker']], 
            on='user_id',
            how='right'
        )
    elif not latest_weight.empty:
        merged_data = latest_weight[['user_id', 'weight_kg', 'weight_pounds', 'bmi']]
        merged_data['age'] = np.nan
        merged_data['smoker'] = None
        merged_data['drinker'] = None
    elif not demographics.empty:
        merged_data = demographics[['user_id', 'age', 'smoker', 'drinker']]
        merged_data['weight_kg'] = np.nan
        merged_data['weight_pounds'] = np.nan
        merged_data['bmi'] = np.nan
    else:
        print("No data available to insert.")
        return
    
    # Add updated_at column
    merged_data['updated_at'] = datetime.now()
    
    return merged_data, latest_weight, demographics

if __name__ == "__main__":
    merged_data,latest,demo = process_and_insert_user_data()

  combined_weight['Date'] = pd.to_datetime(combined_weight['Date'])


In [57]:
try:
        print(f"Inserting {len(merged_data)} records into user_demographics table...")
        merged_data.to_sql('user_demographics', engine, if_exists='append', index=False)
        print("Data insertion completed successfully.")
except Exception as e:
        print(f"Error inserting data: {e}")

Inserting 70 records into user_demographics table...
Data insertion completed successfully.


In [60]:
merged_data.to_csv('Merged_df.csv',index=False)

In [None]:
from datetime import datetime

def process_and_insert_daily_data():
    # Load and combine activity data from 3 CSVs
    paths = ['archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/dailyActivity_merged.csv', 
             'archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/dailyActivity_merged.csv',
             'synthetic_data/synthetic_data/synthetic_dailyActivity.csv']
    activity_dfs = []
    for path in paths:
        try:
            df = pd.read_csv(path)
            activity_dfs.append(df)
            print(f"Loaded {len(df)} rows from {path}")
        except FileNotFoundError:
            print(f"File {path} not found. Skipping.")
    
    if activity_dfs:
        # Combine all activity dataframes
        combined_activity = pd.concat(activity_dfs, ignore_index=True)
        
        # Convert date column and ensure ID is treated as int
        combined_activity['ActivityDate'] = pd.to_datetime(combined_activity['ActivityDate']).dt.date
        combined_activity['Id'] = combined_activity['Id'].astype(int)
        
        # Rename columns to match our database schema
        activity_data = combined_activity.rename(columns={
            'Id': 'user_id',
            'ActivityDate': 'activity_date',
            'TotalSteps': 'total_steps',
            'TotalDistance': 'total_distance',
            'Calories': 'total_calories',
            'VeryActiveMinutes': 'very_active_minutes',
            'LightlyActiveMinutes': 'lightly_active_minutes',
            'SedentaryMinutes': 'sedentary_minutes'
        })
        
        # Select only the columns we need
        activity_data = activity_data[[
            'user_id', 'activity_date', 'total_steps', 'total_distance', 
            'total_calories', 'very_active_minutes', 'lightly_active_minutes', 
            'sedentary_minutes'
        ]]
    else:
        print("No activity data found.")
        activity_data = pd.DataFrame(columns=[
            'user_id', 'activity_date', 'total_steps', 'total_distance', 
            'total_calories', 'very_active_minutes', 'lightly_active_minutes', 
            'sedentary_minutes'
        ])
    
    # Load and process sleep data from 3 CSVs
    sleep_dfs = []
    paths = ['archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/minuteSleep_merged.csv',
                 'archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/minuteSleep_merged.csv',
                 'synthetic_data/synthetic_data/synthetic_minuteSleep.csv']
    for path in paths:
        try:
            df = pd.read_csv(path)
            sleep_dfs.append(df)
            print(f"Loaded {len(df)} rows from {path}")
        except FileNotFoundError:
            print(f"File {path} not found. Skipping.")
    
    if sleep_dfs:
        # Combine all sleep dataframes
        combined_sleep = pd.concat(sleep_dfs, ignore_index=True)
        
        # Convert date column and ensure ID is treated as int
        combined_sleep['date'] = pd.to_datetime(combined_sleep['date']).dt.date
        combined_sleep['Id'] = combined_sleep['Id'].astype(int)
        
        # Sum up sleep minutes by user and date
        sleep_data = combined_sleep.groupby(['Id', 'date'])['value'].sum().reset_index()
        
        # Rename columns to match our schema
        sleep_data = sleep_data.rename(columns={
            'Id': 'user_id',
            'date': 'activity_date',
            'value': 'total_sleep_minutes'
        })
    else:
        print("No sleep data found.")
        sleep_data = pd.DataFrame(columns=['user_id', 'activity_date', 'total_sleep_minutes'])
    
    # Merge activity and sleep data
    if not activity_data.empty and not sleep_data.empty:
        # Outer join to include all records from both datasets
        merged_data = pd.merge(
            activity_data, 
            sleep_data, 
            on=['user_id', 'activity_date'],
            how='outer'
        )
    elif not activity_data.empty:
        merged_data = activity_data
        merged_data['total_sleep_minutes'] = None
    elif not sleep_data.empty:
        merged_data = sleep_data
        for col in ['total_steps', 'total_distance', 'total_calories', 
                    'very_active_minutes', 'lightly_active_minutes', 'sedentary_minutes']:
            merged_data[col] = None
    else:
        print("No data available to insert.")
        return
    
    # Add updated_at column
    merged_data['updated_at'] = datetime.now()
    
    # Handle potential duplicates by using the most complete record
    # Group by primary key and keep the record with the most non-null values
    if len(merged_data) > merged_data.drop_duplicates(subset=['user_id', 'activity_date']).shape[0]:
        print("Found potential duplicate records, resolving...")
        
        # Count non-null values for each record
        merged_data['non_null_count'] = merged_data.notnull().sum(axis=1)
        
        # Sort by count (descending) and take the first record for each user_id/date combination
        merged_data = merged_data.sort_values(['user_id', 'activity_date', 'non_null_count'], 
                                             ascending=[True, True, False])
        merged_data = merged_data.drop_duplicates(subset=['user_id', 'activity_date'])
        merged_data = merged_data.drop(columns=['non_null_count'])
    
    return merged_data
    
    

if __name__ == "__main__":
    merged_data = process_and_insert_daily_data()

           Id ActivityDate  TotalSteps  TotalDistance  TrackerDistance  \
0  1503960366    3/25/2016       11004           7.11             7.11   
1  1503960366    3/26/2016       17609          11.55            11.55   
2  1503960366    3/27/2016       12736           8.53             8.53   
3  1503960366    3/28/2016       13231           8.93             8.93   
4  1503960366    3/29/2016       12041           7.85             7.85   

   LoggedActivitiesDistance  VeryActiveDistance  ModeratelyActiveDistance  \
0                       0.0                2.57                      0.46   
1                       0.0                6.92                      0.73   
2                       0.0                4.66                      0.16   
3                       0.0                3.19                      0.79   
4                       0.0                2.16                      1.09   

   LightActiveDistance  SedentaryActiveDistance  VeryActiveMinutes  \
0                 4.07

In [None]:
# Insert data into PostgreSQL table
try:
    print(f"Inserting {len(merged_data)} records into daily_data table...")
    
    # Use on_conflict="do_nothing" to handle potential primary key violations
    merged_data.to_sql('daily_data', engine, if_exists='append', index=False,
                        method='multi', chunksize=1000)
    
    print("Data insertion completed successfully.")
except Exception as e:
    print(f"Error inserting data: {e}")

Inserting 2341 records into daily_data table...
Data insertion completed successfully.


In [71]:
merged_data.to_csv('daily_merged_data.csv',index=False)

# Creating hourly-data table

In [None]:
with engine.connect() as connection:
    connection.execute(text("""
    CREATE TABLE hourly_data (
        user_id         BIGINT NOT NULL,
        activity_hour   TIMESTAMP NOT NULL,
        steps           INTEGER,
        calories        INTEGER,
        sleep_minutes   INTEGER,
        intensity_level INTEGER,  
        updated_at      TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        PRIMARY KEY (user_id, activity_hour),
        FOREIGN KEY (user_id) REFERENCES user_demographics(user_id)
        );
    """))
    print("Table created successfully!")
    connection.commit()

Table created successfully!


In [77]:
def process_and_insert_hourly_data():
    # Process hourly steps data
    steps_dfs = []
    paths = ['archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/hourlySteps_merged.csv',
                 'archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/hourlySteps_merged.csv',
                 'synthetic_data/synthetic_data/synthetic_hourlySteps.csv']
    for file_path in paths:
        try:
            df = pd.read_csv(file_path)
            steps_dfs.append(df)
            print(f"Loaded {len(df)} rows from {file_path}")
        except FileNotFoundError:
            print(f"File {file_path} not found. Skipping.")
    
    if steps_dfs:
        combined_steps = pd.concat(steps_dfs, ignore_index=True)
        combined_steps['ActivityHour'] = pd.to_datetime(combined_steps['ActivityHour'])
        combined_steps['Id'] = combined_steps['Id'].astype(int)
        steps_data = combined_steps.rename(columns={
            'Id': 'user_id',
            'ActivityHour': 'activity_hour',
            'StepTotal': 'steps'
        })
    else:
        print("No steps data found.")
        steps_data = pd.DataFrame(columns=['user_id', 'activity_hour', 'steps'])
    
    # Process hourly calories data
    calories_dfs = []
    paths = ['archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/hourlyCalories_merged.csv',
                 'archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/hourlyCalories_merged.csv',
                 'synthetic_data/synthetic_data/synthetic_hourlyCalories.csv']
    for file_path in paths:
        try:
            df = pd.read_csv(file_path)
            calories_dfs.append(df)
            print(f"Loaded {len(df)} rows from {file_path}")
        except FileNotFoundError:
            print(f"File {file_path} not found. Skipping.")
    
    if calories_dfs:
        combined_calories = pd.concat(calories_dfs, ignore_index=True)
        combined_calories['ActivityHour'] = pd.to_datetime(combined_calories['ActivityHour'])
        combined_calories['Id'] = combined_calories['Id'].astype(int)
        calories_data = combined_calories.rename(columns={
            'Id': 'user_id',
            'ActivityHour': 'activity_hour',
            'Calories': 'calories'
        })
    else:
        print("No calories data found.")
        calories_data = pd.DataFrame(columns=['user_id', 'activity_hour', 'calories'])
    
    # Process hourly intensity data
    intensity_dfs = []
    paths = ['archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/hourlyIntensities_merged.csv',
                 'archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/hourlyIntensities_merged.csv',
                 'synthetic_data/synthetic_data/synthetic_hourlyIntensities.csv']
    for file_path in paths:
        try:
            # file_path = f'hourlyIntensity_{i}.csv'
            df = pd.read_csv(file_path)
            intensity_dfs.append(df)
            print(f"Loaded {len(df)} rows from {file_path}")
        except FileNotFoundError:
            print(f"File {file_path} not found. Skipping.")
    
    if intensity_dfs:
        combined_intensity = pd.concat(intensity_dfs, ignore_index=True)
        combined_intensity['ActivityHour'] = pd.to_datetime(combined_intensity['ActivityHour'])
        combined_intensity['Id'] = combined_intensity['Id'].astype(int)
        # Use TotalIntensity as the intensity_level (you can change to AverageIntensity if preferred)
        intensity_data = combined_intensity.rename(columns={
            'Id': 'user_id',
            'ActivityHour': 'activity_hour',
            'TotalIntensity': 'intensity_level'
        })[['user_id', 'activity_hour', 'intensity_level']]
    else:
        print("No intensity data found.")
        intensity_data = pd.DataFrame(columns=['user_id', 'activity_hour', 'intensity_level'])
        
    # Process sleep data
    sleep_dfs = []
    paths = ['archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/minuteSleep_merged.csv',
                 'archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/minuteSleep_merged.csv',
                 'synthetic_data/synthetic_data/synthetic_minuteSleep.csv']
    for file_path in paths:
        try:
            df = pd.read_csv(file_path)
            sleep_dfs.append(df)
            print(f"Loaded {len(df)} rows from {file_path}")
        except FileNotFoundError:
            print(f"File {file_path} not found. Skipping.")
    
    if sleep_dfs:
        combined_sleep = pd.concat(sleep_dfs, ignore_index=True)
        # Convert date and ensure ID is treated as int
        combined_sleep['date'] = pd.to_datetime(combined_sleep['date'])
        combined_sleep['Id'] = combined_sleep['Id'].astype(int)
        
        # Extract hour from datetime and group by user, date and hour
        combined_sleep['hour'] = combined_sleep['date'].dt.floor('H')
        
        # Sum up sleep minutes by user and hour
        sleep_data = combined_sleep.groupby(['Id', 'hour'])['value'].sum().reset_index()
        
        # Rename columns to match our schema
        sleep_data = sleep_data.rename(columns={
            'Id': 'user_id',
            'hour': 'activity_hour',
            'value': 'sleep_minutes'
        })
    else:
        print("No sleep data found.")
        sleep_data = pd.DataFrame(columns=['user_id', 'activity_hour', 'sleep_minutes'])
    
    # Merge all dataframes
    # First, merge steps and calories
    if not steps_data.empty and not calories_data.empty:
        merged_data = pd.merge(
            steps_data, 
            calories_data, 
            on=['user_id', 'activity_hour'],
            how='outer'
        )
    elif not steps_data.empty:
        merged_data = steps_data
    elif not calories_data.empty:
        merged_data = calories_data
    else:
        merged_data = pd.DataFrame(columns=['user_id', 'activity_hour', 'steps', 'calories'])
    
    # Merge with intensity data
    if not intensity_data.empty:
        merged_data = pd.merge(
            merged_data,
            intensity_data,
            on=['user_id', 'activity_hour'],
            how='outer'
        )
    
    # Finally, merge with sleep data
    if not sleep_data.empty:
        merged_data = pd.merge(
            merged_data,
            sleep_data,
            on=['user_id', 'activity_hour'],
            how='outer'
        )
    
    # Add updated_at column
    merged_data['updated_at'] = datetime.now()
    
    # Handle potential duplicates by using the most complete record
    if len(merged_data) > merged_data.drop_duplicates(subset=['user_id', 'activity_hour']).shape[0]:
        print("Found potential duplicate records, resolving...")
        
        # Count non-null values for each record
        merged_data['non_null_count'] = merged_data.notnull().sum(axis=1)
        
        # Sort by count (descending) and take the first record for each user_id/hour combination
        merged_data = merged_data.sort_values(['user_id', 'activity_hour', 'non_null_count'], 
                                             ascending=[True, True, False])
        merged_data = merged_data.drop_duplicates(subset=['user_id', 'activity_hour'])
        merged_data = merged_data.drop(columns=['non_null_count'])
    
    return merged_data

if __name__ == "__main__":
    merged_data = process_and_insert_hourly_data()

Loaded 24084 rows from archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/hourlySteps_merged.csv
Loaded 22099 rows from archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/hourlySteps_merged.csv
Loaded 24084 rows from synthetic_data/synthetic_data/synthetic_hourlySteps.csv


  combined_steps['ActivityHour'] = pd.to_datetime(combined_steps['ActivityHour'])


Loaded 24084 rows from archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/hourlyCalories_merged.csv
Loaded 22099 rows from archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/hourlyCalories_merged.csv
Loaded 24084 rows from synthetic_data/synthetic_data/synthetic_hourlyCalories.csv


  combined_calories['ActivityHour'] = pd.to_datetime(combined_calories['ActivityHour'])


Loaded 24084 rows from archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/hourlyIntensities_merged.csv
Loaded 22099 rows from archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/hourlyIntensities_merged.csv
Loaded 24084 rows from synthetic_data/synthetic_data/synthetic_hourlyIntensities.csv


  combined_intensity['ActivityHour'] = pd.to_datetime(combined_intensity['ActivityHour'])


Loaded 198559 rows from archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/minuteSleep_merged.csv
Loaded 188521 rows from archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/minuteSleep_merged.csv
Loaded 198559 rows from synthetic_data/synthetic_data/synthetic_minuteSleep.csv


  combined_sleep['hour'] = combined_sleep['date'].dt.floor('H')


Found potential duplicate records, resolving...


In [80]:
try:
    print(f"Inserting {len(merged_data)} records into hourly_data table...")
    
    # Use pd.to_sql with a chunksize for large datasets
    merged_data.to_sql('hourly_data', engine, if_exists='append', index=False,
                        method='multi', chunksize=1000)
    
    print("Data insertion completed successfully.")
except Exception as e:
    print(f"Error inserting data: {e}")

Inserting 70434 records into hourly_data table...
Data insertion completed successfully.


In [81]:
merged_data.to_csv('hourly_merged_data.csv',index=False)

# Creating Minute-level data table

In [82]:
with engine.connect() as connection:
    connection.execute(text("""
   CREATE TABLE minute_data (
        user_id         BIGINT NOT NULL,
        activity_minute TIMESTAMP NOT NULL,
        steps           INTEGER,
        intensity       INTEGER,
        mets            NUMERIC,
        sleep           INTEGER,
        heart_rate      INTEGER,  -- Added heart rate column
        updated_at      TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        PRIMARY KEY (user_id, activity_minute),
        FOREIGN KEY (user_id) REFERENCES user_demographics(user_id)
    );
    """))
    print("Table created successfully!")
    connection.commit()

Table created successfully!


In [108]:
def process_and_insert_minute_data():
    datasets = {
        'steps': {
            'files': ['archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/minuteStepsNarrow_merged.csv', 
                      'archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/minuteStepsNarrow_merged.csv', 
                      'synthetic_data/synthetic_data/synthetic_minuteSteps.csv'],
            'time_col': 'ActivityMinute',
            'value_col': 'Steps',
            'target_col': 'steps'
        },
        'intensity': {
            'files': ['archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/minuteIntensitiesNarrow_merged.csv', 
                      'archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/minuteIntensitiesNarrow_merged.csv', 
                      'synthetic_data/synthetic_data/synthetic_minuteIntensities.csv'],
            'time_col': 'ActivityMinute',
            'value_col': 'Intensity',
            'target_col': 'intensity'
        },
        'mets': {
            'files': ['archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/minuteMETsNarrow_merged.csv', 
                      'archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/minuteMETsNarrow_merged.csv', 
                      'synthetic_data/synthetic_data/synthetic_minuteMETs.csv'],
            'time_col': 'ActivityMinute',
            'value_col': 'METs',
            'target_col': 'mets'
        },
        'sleep': {
            'files': ['archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/minuteSleep_merged.csv', 
                      'archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/minuteSleep_merged.csv', 
                      'synthetic_data/synthetic_data/synthetic_minuteSleep.csv'],
            'time_col': 'date',
            'value_col': 'value',
            'target_col': 'sleep'
        }
    }
    
    # Process each dataset type
    processed_dfs = {}
    
    for data_type, config in datasets.items():
        dfs = []
        for file_path in config['files']:
            try:
                df = pd.read_csv(file_path)
                dfs.append(df)
                print(f"Loaded {len(df)} rows from {file_path}")
            except FileNotFoundError:
                print(f"File {file_path} not found. Skipping.")
        
        if dfs:
            combined = pd.concat(dfs, ignore_index=True)
            combined['Id'] = combined['Id'].astype(int)
            combined[config['time_col']] = pd.to_datetime(combined[config['time_col']], format='mixed')
            
            # Rename columns to match our schema
            renamed = combined.rename(columns={
                'Id': 'user_id',
                config['time_col']: 'activity_minute',
                config['value_col']: config['target_col']
            })
            
            # Select only the columns we need
            processed_dfs[data_type] = renamed[['user_id', 'activity_minute', config['target_col']]]
        else:
            print(f"No {data_type} data found.")
            processed_dfs[data_type] = pd.DataFrame(columns=['user_id', 'activity_minute', config['target_col']])
    
    # Process heart rate data separately because it's per second and needs aggregation
    heart_dfs = []
    paths = ['archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/heartrate_seconds_merged.csv',
             'archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/heartrate_seconds_merged.csv',
             'synthetic_data/synthetic_data/synthetic_heartrate.csv']
    for file_path in paths:  # Assuming you have 3 heart rate files
        try:
            df = pd.read_csv(file_path)
            heart_dfs.append(df)
            print(f"Loaded {len(df)} rows from {file_path}")
        except FileNotFoundError:
            print(f"File {file_path} not found. Skipping.")
    
    if heart_dfs:
        combined_heart = pd.concat(heart_dfs, ignore_index=True)
        combined_heart['Id'] = combined_heart['Id'].astype(int)
        combined_heart['Time'] = pd.to_datetime(combined_heart['Time'], format='mixed')  
        
        # Create a minute-level timestamp by truncating seconds
        combined_heart['activity_minute'] = combined_heart['Time'].dt.floor('min')
        
        # Group by user and minute, taking max heart rate
        heart_rate_data = combined_heart.groupby(['Id', 'activity_minute'])['Value'].max().reset_index()
        
        # Rename columns
        heart_rate_data = heart_rate_data.rename(columns={
            'Id': 'user_id',
            'Value': 'heart_rate'
        })
    else:
        print("No heart rate data found.")
        heart_rate_data = pd.DataFrame(columns=['user_id', 'activity_minute', 'heart_rate'])
    
    # Begin merging all datasets
    # Start with steps data if available, otherwise create empty DataFrame
    if 'steps' in processed_dfs and not processed_dfs['steps'].empty:
        merged_data = processed_dfs['steps']
    else:
        merged_data = pd.DataFrame(columns=['user_id', 'activity_minute', 'steps'])
    
    # Merge with each other dataset
    for data_type in ['intensity', 'mets', 'sleep']:
        if data_type in processed_dfs and not processed_dfs[data_type].empty:
            merged_data = pd.merge(
                merged_data,
                processed_dfs[data_type],
                on=['user_id', 'activity_minute'],
                how='outer'
            )
    
    # Merge with heart rate data
    if not heart_rate_data.empty:
        merged_data = pd.merge(
            merged_data,
            heart_rate_data,
            on=['user_id', 'activity_minute'],
            how='outer'
        )
    
    # Add updated_at column
    merged_data['updated_at'] = datetime.now()
    
    # Handle potential duplicates
    if len(merged_data) > merged_data.drop_duplicates(subset=['user_id', 'activity_minute']).shape[0]:
        print("Found potential duplicate records, resolving...")
        
        # Count non-null values for each record
        merged_data['non_null_count'] = merged_data.notnull().sum(axis=1)
        
        # Sort by count (descending) and take the first record for each user_id/minute combination
        merged_data = merged_data.sort_values(
            ['user_id', 'activity_minute', 'non_null_count'], 
            ascending=[True, True, False]
        )
        merged_data = merged_data.drop_duplicates(subset=['user_id', 'activity_minute'])
        merged_data = merged_data.drop(columns=['non_null_count'])
    
    return merged_data

if __name__ == "__main__":
    result = process_and_insert_minute_data()

Loaded 1445040 rows from archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/minuteStepsNarrow_merged.csv
Loaded 1325580 rows from archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/minuteStepsNarrow_merged.csv
Loaded 1445040 rows from synthetic_data/synthetic_data/synthetic_minuteSteps.csv
Loaded 1445040 rows from archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/minuteIntensitiesNarrow_merged.csv
Loaded 1325580 rows from archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/minuteIntensitiesNarrow_merged.csv
Loaded 1445040 rows from synthetic_data/synthetic_data/synthetic_minuteIntensities.csv
Loaded 1445040 rows from archive/mturkfitbit_export_3.12.16-4.11.16/Fitabase Data 3.12.16-4.11.16/minuteMETsNarrow_merged.csv
Loaded 1325580 rows from archive/mturkfitbit_export_4.12.16-5.12.16/Fitabase Data 4.12.16-5.12.16/minuteMETsNarrow_merged.csv
Loaded 1445040 rows from synthetic_data/synthetic_data/synth

In [112]:
 # Insert data into PostgreSQL table
try:
    print(f"Inserting {len(result)} records into minute_data table...")
    
    # Use chunking for large datasets
    chunk_size = 10000  # Adjust based on your system's capabilities
    total_rows = len(result)
    
    for i in range(0, total_rows, chunk_size):
        chunk = result.iloc[i:i + chunk_size]
        chunk.to_sql('minute_data', engine, if_exists='append', index=False)
        print(f"Inserted chunk {i//chunk_size + 1} ({len(chunk)} rows)")
    
    print("Data insertion completed successfully.")
except Exception as e:
    print(f"Error inserting data: {e}")
    # Consider more detailed error handling if needed

Inserting 4436505 records into minute_data table...
Inserted chunk 1 (10000 rows)
Inserted chunk 2 (10000 rows)
Inserted chunk 3 (10000 rows)
Inserted chunk 4 (10000 rows)
Inserted chunk 5 (10000 rows)
Inserted chunk 6 (10000 rows)
Inserted chunk 7 (10000 rows)
Inserted chunk 8 (10000 rows)
Inserted chunk 9 (10000 rows)
Inserted chunk 10 (10000 rows)
Inserted chunk 11 (10000 rows)
Inserted chunk 12 (10000 rows)
Inserted chunk 13 (10000 rows)
Inserted chunk 14 (10000 rows)
Inserted chunk 15 (10000 rows)
Inserted chunk 16 (10000 rows)
Inserted chunk 17 (10000 rows)
Inserted chunk 18 (10000 rows)
Inserted chunk 19 (10000 rows)
Inserted chunk 20 (10000 rows)
Inserted chunk 21 (10000 rows)
Inserted chunk 22 (10000 rows)
Inserted chunk 23 (10000 rows)
Inserted chunk 24 (10000 rows)
Inserted chunk 25 (10000 rows)
Inserted chunk 26 (10000 rows)
Inserted chunk 27 (10000 rows)
Inserted chunk 28 (10000 rows)
Inserted chunk 29 (10000 rows)
Inserted chunk 30 (10000 rows)
Inserted chunk 31 (10000 ro

In [113]:
result.to_csv('minute_merged_data.csv',index=False)