In [26]:
pip install pandas mysql-connector-python

You should consider upgrading via the '/Library/Frameworks/Python.framework/Versions/3.9/bin/python3.9 -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [23]:
import pandas as pd
import re
from data201 import db_connection
from mysql.connector import errorcode

In [24]:
# Load the CSV file
df = pd.read_csv('Air_pollution.csv')

In [25]:
# Transform: Select relevant columns and limit to 100 rows
df_100 = df.head(100)
print(df_100)

   Unique ID   District        Date          Measure  Longitude  Latitude  \
0   01BBF6AD     Mumbai  01-12-2010        NO2 (ppb)    72.8777   19.0760   
1   00ED349D    Kolkata  01-10-2011     PM10 (µg/m³)    88.3639   22.5726   
2   01EA78C9  Bengaluru  01-12-2008     PM10 (µg/m³)    77.5946   12.9716   
3   0164FDF7    Chennai  01-08-2014        NO2 (ppb)    80.2707   13.0827   
4   0244B8FE    Chennai  01-07-2013        NO2 (ppb)    80.2707   13.0827   
..       ...        ...         ...              ...        ...       ...   
95  00D2085F     Mumbai  01-04-2000        NO2 (ppb)    72.8777   19.0760   
96  00440C22  Bengaluru  01-08-2005  Ozone (O3, ppb)    77.5946   12.9716   
97  023B3084    Kolkata  01-03-2019        NO2 (ppb)    88.3639   22.5726   
98  00F3A410     Mumbai  01-11-2021  Ozone (O3, ppb)    72.8777   19.0760   
99  0099F5ED    Kolkata  01-04-2001  Ozone (O3, ppb)    88.3639   22.5726   

   Time Period  Data Value Air Quality  
0      Monthly          32    Mode

In [26]:
conn = db_connection(config_file = 'joinforce.ini')
cursor = conn.cursor()

In [27]:
# Create District table (One-to-One relationship with Location)
    
sql = ( """
    CREATE TABLE IF NOT EXISTS district (
        district_id INT AUTO_INCREMENT PRIMARY KEY,
        district_name VARCHAR(100) NOT NULL UNIQUE
    )""")

cursor.execute(sql);

In [28]:
# Create Location table (One-to-One relationship with District)
#doesn't work if district_id is used as primary key for multiple tables:https://stackoverflow.com/questions/55631622/can-i-use-one-same-primary-key-in-two-different-tables
    
cursor.execute('DROP TABLE IF EXISTS location')
sql = ( """CREATE TABLE location (
        location_id INT AUTO_INCREMENT PRIMARY KEY,
        longitude FLOAT,
        latitude FLOAT,
        district_id INT,
        FOREIGN KEY (district_id) REFERENCES district(district_id) ON DELETE CASCADE
        )""")

cursor.execute(sql);

In [29]:
# Create Pollutant table(one-to-many relationship with District))

sql = ("""CREATE TABLE IF NOT EXISTS pollutant (
        pollutant_id INT AUTO_INCREMENT PRIMARY KEY,
        measure VARCHAR(50))""")

cursor.execute(sql);

In [30]:
# Create AirQuality table (Many-to-Many relationship between AirQuality and Pollutant)

sql = ("""CREATE TABLE IF NOT EXISTS air_quality (
        aq_id VARCHAR(70) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci PRIMARY KEY,
        district_id INT ,
        date DATE NOT NULL,
        measure VARCHAR(50) NOT NULL,
        data_value FLOAT NOT NULL,
        air_quality VARCHAR(70) NOT NULL,
        FOREIGN KEY (district_id) REFERENCES district(district_id) ON DELETE CASCADE);""")

cursor.execute(sql);

In [31]:
# Create AQPollutant table 

cursor.execute("DROP TABLE IF EXISTS aq_pollutant")

sql = """
CREATE TABLE IF NOT EXISTS aq_pollutant (
    aq_id VARCHAR(70) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci,
    pollutant_id INT,
    PRIMARY KEY (aq_id, pollutant_id),
    FOREIGN KEY (aq_id) REFERENCES air_quality(aq_id) ON DELETE CASCADE,
    FOREIGN KEY (pollutant_id) REFERENCES pollutant(pollutant_id) ON DELETE CASCADE
);
"""

cursor.execute(sql)

In [32]:
# SQL query to insert data in District
    
try:
    if conn.is_connected():
        cursor = conn.cursor()
        if 'District' not in df.columns:
            raise ValueError("Column 'district_name' not found in the DataFrame.")

        # Convert to list of tuples
        unique_districts = df_100['District'].dropna().unique().tolist()
        print(f"this is {unique_districts}")
        all_district_name = [(district,) for district in unique_districts]
        print(f"Districts to Insert: {all_district_name}")
        if not all_district_name:
            raise ValueError(" No valid district names found for insertion.")
            
        insert_query = "INSERT INTO district (district_name) VALUES (%s)"
        cursor.executemany(insert_query, all_district_name)
        conn.commit()
        print(f"Successfully inserted district: {cursor.rowcount}")

except Exception as e:
    conn.rollback()
    print(f"Error while inserting data: {e}")

finally:
    # Close the connection
    cursor.close()
    conn.close()
    print("MySQL connection is closed.")

this is ['Mumbai', 'Kolkata', 'Bengaluru', 'Chennai', 'Delhi']
Districts to Insert: [('Mumbai',), ('Kolkata',), ('Bengaluru',), ('Chennai',), ('Delhi',)]
Successfully inserted district: 5
MySQL connection is closed.


In [33]:
#reconnect to db
conn = db_connection(config_file = 'joinforce.ini')

# SQL query to insert data in Location

try:

    cursor = conn.cursor()

    # Drop NaN values
    df_cleaned = df_100.dropna(subset=['Longitude', 'Latitude', 'District'])

    # Fetch district_id for each district_name
    district_map = {}
    cursor.execute("SELECT district_id, district_name FROM district")
    for district_id, district_name in cursor.fetchall():
        district_map[district_name] = district_id  # Create a mapping

    # Map district_name to district_id
    df_cleaned['district_id'] = df_cleaned['District'].map(district_map)

    # Remove rows where district_id is NULL (invalid mapping)
    df_cleaned = df_cleaned.dropna(subset=['district_id'])

    # Convert district_id to int (ensure it's not an object type)
    df_cleaned['district_id'] = df_cleaned['district_id'].astype(int)

    # Convert DataFrame to list of tuples
    data = list(df_cleaned[['Longitude', 'Latitude', 'district_id']].itertuples(index=False, name=None))

    print(f"Data to Insert: {data}")

    if not data:
        raise ValueError("No valid data found for insertion.")

    # Corrected Insert Query (including district_id)
    insert_query = "INSERT INTO location (longitude, latitude, district_id) VALUES (%s, %s, %s)"
    cursor.executemany(insert_query, data)
    conn.commit()

    print(f"Successfully inserted {cursor.rowcount} rows.")

except Exception as e:
    conn.rollback()
    print(f"Error while inserting data: {e}")

finally:
    cursor.close()
    conn.close()
    print("MySQL connection is closed.")


Data to Insert: [(72.8777, 19.076, 1), (88.3639, 22.5726, 2), (77.5946, 12.9716, 3), (80.2707, 13.0827, 4), (80.2707, 13.0827, 4), (77.1025, 28.7041, 5), (88.3639, 22.5726, 2), (77.5946, 12.9716, 3), (77.5946, 12.9716, 3), (88.3639, 22.5726, 2), (72.8777, 19.076, 1), (77.5946, 12.9716, 3), (77.1025, 28.7041, 5), (88.3639, 22.5726, 2), (88.3639, 22.5726, 2), (88.3639, 22.5726, 2), (88.3639, 22.5726, 2), (77.1025, 28.7041, 5), (88.3639, 22.5726, 2), (80.2707, 13.0827, 4), (72.8777, 19.076, 1), (77.5946, 12.9716, 3), (88.3639, 22.5726, 2), (80.2707, 13.0827, 4), (88.3639, 22.5726, 2), (77.5946, 12.9716, 3), (72.8777, 19.076, 1), (72.8777, 19.076, 1), (77.5946, 12.9716, 3), (72.8777, 19.076, 1), (72.8777, 19.076, 1), (77.5946, 12.9716, 3), (77.1025, 28.7041, 5), (88.3639, 22.5726, 2), (77.5946, 12.9716, 3), (77.5946, 12.9716, 3), (80.2707, 13.0827, 4), (77.1025, 28.7041, 5), (72.8777, 19.076, 1), (77.1025, 28.7041, 5), (88.3639, 22.5726, 2), (80.2707, 13.0827, 4), (88.3639, 22.5726, 2), (7

In [34]:
#reconnect to db
conn = db_connection(config_file = 'joinforce.ini')

# SQL query to insert data in Pollutant

try:
    
    if conn.is_connected():
        cursor = conn.cursor()

        # Ensure required columns exist
        if 'Longitude' not in df_100.columns or 'Latitude' not in df_100.columns:
            raise ValueError("Required columns ('Longitude', 'Latitude') not found in the DataFrame.")
            
            print(df_100['Measure'].dtype)


        # Convert DataFrame to list of tuples 
        data = [(measure,) for measure in df_100['Measure'].astype(str)]

        print(f"Data to Insert: {data}")

        if not data:
            raise ValueError("No valid data found for insertion.")

        # Insert data into MySQL
        insert_query = "INSERT INTO pollutant (measure) VALUES (%s)"
        cursor.executemany(insert_query, data)
        conn.commit()

        print(f"Successfully inserted {cursor.rowcount} rows.")

except Exception as e:
    conn.rollback()
    print(f"Error while inserting data: {e}")

finally:
    cursor.close()
    conn.close()
    print("MySQL connection is closed.")


Data to Insert: [('NO2 (ppb)',), ('PM10 (µg/m³)',), ('PM10 (µg/m³)',), ('NO2 (ppb)',), ('NO2 (ppb)',), ('SO2 (ppb)',), ('Ozone (O3, ppb)',), ('NO2 (ppb)',), ('PM2.5 (µg/m³)',), ('PM2.5 (µg/m³)',), ('SO2 (ppb)',), ('PM2.5 (µg/m³)',), ('PM10 (µg/m³)',), ('PM10 (µg/m³)',), ('NO2 (ppb)',), ('PM2.5 (µg/m³)',), ('PM2.5 (µg/m³)',), ('PM10 (µg/m³)',), ('PM10 (µg/m³)',), ('NO2 (ppb)',), ('SO2 (ppb)',), ('NO2 (ppb)',), ('PM2.5 (µg/m³)',), ('PM10 (µg/m³)',), ('PM2.5 (µg/m³)',), ('PM2.5 (µg/m³)',), ('SO2 (ppb)',), ('PM2.5 (µg/m³)',), ('PM10 (µg/m³)',), ('PM2.5 (µg/m³)',), ('PM2.5 (µg/m³)',), ('SO2 (ppb)',), ('Ozone (O3, ppb)',), ('SO2 (ppb)',), ('PM10 (µg/m³)',), ('PM2.5 (µg/m³)',), ('Ozone (O3, ppb)',), ('PM10 (µg/m³)',), ('Ozone (O3, ppb)',), ('PM10 (µg/m³)',), ('PM10 (µg/m³)',), ('PM2.5 (µg/m³)',), ('Ozone (O3, ppb)',), ('PM2.5 (µg/m³)',), ('Ozone (O3, ppb)',), ('PM2.5 (µg/m³)',), ('PM10 (µg/m³)',), ('SO2 (ppb)',), ('Ozone (O3, ppb)',), ('Ozone (O3, ppb)',), ('Ozone (O3, ppb)',), ('PM10 (µg/m³)

In [35]:
#reconnect to db
conn = db_connection(config_file = 'joinforce.ini')

# SQL query to insert data in Air Quality
try:
    if conn.is_connected():
        cursor = conn.cursor()

        # Ensure required columns exist
        required_columns = {'Unique ID','Measure', 'Date', 'Data Value', 'Air Quality', 'District'}
        if not required_columns.issubset(df_100.columns):
            raise ValueError(f"Required columns {required_columns} not found in the DataFrame.")

        # Convert 'Date' column to MySQL format (YYYY-MM-DD)
        df_100['Date'] = pd.to_datetime(df_100['Date'], errors='coerce').dt.strftime('%Y-%m-%d')

        #check for "" string before dropping
        df_100['Unique ID'].replace("", None, inplace=True)
        
        # Drop NaN values
        df_cleaned = df_100.dropna(subset=['Unique ID','Date', 'Measure', 'Data Value', 'Air Quality', 'District'])

        #check aq_id
        print("Max length of aq_id:", df_cleaned['Unique ID'].astype(str).str.len().max())
        df_cleaned['Unique ID'] = df_cleaned['Unique ID'].astype(str).str.strip()
        print(df_cleaned[['Unique ID']].dtypes)

        df_cleaned['Unique ID'] = df_cleaned['Unique ID'].apply(lambda x: x.encode('utf-8').decode('utf-8'))  # Normalize encoding
        df_cleaned['Unique ID'] = df_cleaned['Unique ID'].apply(lambda x: re.sub(r'[^\x20-\x7E]', '', x))  # Remove non-ASCII chars
        print(df_cleaned[['Unique ID']])
              
        invalid_chars = df_cleaned['Unique ID'].apply(lambda x: any(ord(c) < 32 or ord(c) > 126 for c in x))
        if invalid_chars.any():
            print("Warning: Some 'Unique ID' values contain hidden special characters!")
            print(df_cleaned.loc[invalid_chars, 'Unique ID'])

        
        # Fetch  for each district_name
        district_map = {}
        cursor.execute("SELECT district_id, district_name FROM district")
        for district_id, district_name in cursor.fetchall():
            district_map[district_name] = district_id  # Create a mapping

        # Map district_name to district_id
        df_cleaned['district_id'] = df_cleaned['District'].map(district_map)

        # Remove rows where district_id is still NULL (invalid mapping)
        df_cleaned = df_cleaned.dropna(subset=['district_id'])

        # Convert DataFrame to list of tuples
        data = list(df_cleaned[['Unique ID','district_id', 'Date', 'Measure', 'Data Value', 'Air Quality']].itertuples(index=False, name=None))

        print(f"Data to Insert: {data}")

        if not data:
            raise ValueError("No valid data found for insertion.")

        # Corrected Insert Query (including district_id)
        insert_query = "INSERT INTO air_quality (aq_id, district_id, date, measure, data_value, air_quality) VALUES (CAST(%s AS CHAR), %s, %s, %s, %s, %s)"
        cursor.executemany(insert_query, data)
        conn.commit()

        print(f"Successfully inserted {cursor.rowcount} rows.")

except Exception as e:
    conn.rollback()
    print(f"Error while inserting data: {e}")

finally:
    cursor.close()
    conn.close()
    print("MySQL connection is closed.")


Max length of aq_id: 8
Unique ID    object
dtype: object
   Unique ID
0   01BBF6AD
1   00ED349D
2   01EA78C9
3   0164FDF7
4   0244B8FE
..       ...
95  00D2085F
96  00440C22
97  023B3084
98  00F3A410
99  0099F5ED

[100 rows x 1 columns]
Data to Insert: [('01BBF6AD', 1, '2010-01-12', 'NO2 (ppb)', 32, 'Moderate'), ('00ED349D', 2, '2011-01-10', 'PM10 (µg/m³)', 107, 'Poor'), ('01EA78C9', 3, '2008-01-12', 'PM10 (µg/m³)', 178, 'Very Poor'), ('0164FDF7', 4, '2014-01-08', 'NO2 (ppb)', 49, 'Moderate'), ('0244B8FE', 4, '2013-01-07', 'NO2 (ppb)', 9, 'Good'), ('01FF75BF', 5, '2018-01-04', 'SO2 (ppb)', 37, 'Moderate'), ('00EF0055', 2, '2011-01-02', 'Ozone (O3, ppb)', 45, 'Good'), ('01B78829', 3, '2007-01-12', 'NO2 (ppb)', 34, 'Moderate'), ('014F4102', 3, '2017-01-05', 'PM2.5 (µg/m³)', 58, 'Moderate'), ('00EC0414', 2, '2022-01-12', 'PM2.5 (µg/m³)', 173, 'Very Poor'), ('0083025A', 1, '2004-01-05', 'SO2 (ppb)', 44, 'Moderate'), ('021B3031', 3, '2009-01-08', 'PM2.5 (µg/m³)', 65, 'Moderate'), ('005503F2

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_100['Date'] = pd.to_datetime(df_100['Date'], errors='coerce').dt.strftime('%Y-%m-%d')


In [36]:
#reconnect to db
conn = db_connection(config_file = 'joinforce.ini')

# SQL query to insert data in Aq Pollutant
try:
    if conn.is_connected():
        cursor = conn.cursor()

        # Fetch mapping for aq_id (from air_quality table) - aq_id is a string!
        aq_map = {}
        cursor.execute("SELECT aq_id FROM air_quality")
        for (aq_id,) in cursor.fetchall(): 
            aq_map[aq_id] = aq_id  # Store as string

        # Fetch mapping for pollutant_id (from pollutant table)
        pollutant_map = {}
        cursor.execute("SELECT pollutant_id, measure FROM pollutant")  
        for pollutant_id, measure in cursor.fetchall(): 
            pollutant_map[measure] = pollutant_id  # Store as integer

        
        # Ensure required columns exist
        required_columns = {'Unique ID', 'Measure'}
        if not required_columns.issubset(df_100.columns):
            raise ValueError(f"Required columns {required_columns} not found in the DataFrame.")


        # Create a cleaned dataframe copy
        df_cleaned = df_100.copy()

        # Map AQ IDs (keep them as strings)
        df_cleaned['aq_id'] = df_cleaned['Unique ID'].map(aq_map)
      

        # Ensure 'Measure' column exists before mapping to pollutant_id
        if 'Measure' in df_cleaned.columns:
            df_cleaned['pollutant_id'] = df_cleaned['Measure'].map(pollutant_map)
        else:
            raise ValueError("'Measure' column missing in DataFrame")


        # Handle NaN values in pollutant_id
        df_cleaned = df_cleaned.dropna(subset=['aq_id', 'pollutant_id'])
        
        # Convert pollutant_id to integer (aq_id remains a string)
        df_cleaned['pollutant_id'] = df_cleaned['pollutant_id'].astype(int)

        # Convert DataFrame to list of tuples
        data = list(df_cleaned[['aq_id', 'pollutant_id']].itertuples(index=False, name=None))


        if not data:
            raise ValueError("No valid data found for insertion.")

        # Insert Query (aq_id remains string)
        insert_query = "INSERT INTO aq_pollutant (aq_id, pollutant_id) VALUES (%s, %s)"
        cursor.executemany(insert_query, data)
        conn.commit()

        print(f"Successfully inserted {cursor.rowcount} rows.")

except Exception as e:
    conn.rollback()
    print(f"Error while inserting data: {e}")

finally:
    cursor.close()
    conn.close()
    print("MySQL connection is closed.")

Successfully inserted 100 rows.
MySQL connection is closed.


In [38]:
#reconnect to db
conn = db_connection(config_file = 'joinforce.ini')

#  Query to fetch first 25 rows
try:
    if conn.is_connected():
        cursor = conn.cursor()

        query = "SELECT * FROM air_quality LIMIT 25;"
        cursor.execute(query)
        rows = cursor.fetchall()

        # Get column names from cursor description
        column_names = [desc[0] for desc in cursor.description]

 
        df_air_quality = pd.DataFrame(rows, columns=column_names)

        print(df_air_quality)

except Exception as e:
    print(f"Error while executing queries: {e}")

finally:
    cursor.close()
    conn.close()
    print("MySQL connection is closed.")

       aq_id  district_id        date          measure  data_value air_quality
0   00017E19            2  2019-01-09     PM10 (µg/m³)       156.0   Very Poor
1   00142C5A            1  2015-01-06  Ozone (O3, ppb)        46.0        Good
2   00275230            4  2018-01-11  Ozone (O3, ppb)        34.0        Good
3   003A7416            2  2016-01-01  Ozone (O3, ppb)        21.0        Good
4   003B7FE0            4  2016-01-03     PM10 (µg/m³)        61.0    Moderate
5   003D5558            3  2016-01-01  Ozone (O3, ppb)        47.0        Good
6   003EE43D            2  2011-01-05  Ozone (O3, ppb)        82.0    Moderate
7   00440C22            3  2005-01-08  Ozone (O3, ppb)        53.0        Good
8   004940C3            3  2015-01-04     PM10 (µg/m³)       158.0   Very Poor
9   005503F2            5  2020-01-07     PM10 (µg/m³)        51.0    Moderate
10  0059F9C2            5  2012-01-05  Ozone (O3, ppb)        83.0    Moderate
11  00604AF6            3  2001-01-11        SO2 (pp

In [34]:
#reconnect to db
conn = db_connection(config_file = 'joinforce.ini')

# 5 queries that i was already experimenting with
queries = {
    "District & Location": """
        SELECT d.district_id, d.district_name, l.longitude, l.latitude 
        FROM joinforce_db.district d 
        JOIN joinforce_db.location l ON d.district_id = l.location_id LIMIT 0, 100;
    """,
    
    "Air Quality Per District": """
        SELECT d.district_id, d.district_name, a.date, a.measure, a.data_value, a.air_quality
        FROM joinforce_db.district d
        JOIN joinforce_db.air_quality a ON d.district_id = a.district_id
        ORDER BY d.district_id, a.date DESC;
    """,

    "District, Location & Air Quality": """
        SELECT d.district_name AS district_name, l.longitude, l.latitude, a.date, a.measure, a.data_value, a.air_quality
        FROM joinforce_db.air_quality a
        JOIN joinforce_db.district d ON a.district_id = d.district_id
        JOIN joinforce_db.location l ON d.district_id = l.location_id 
        ORDER BY a.date DESC;
    """,

    "Average Air Quality Per District": """
        SELECT d.district_name AS district_name, a.measure, AVG(a.data_value) AS avg_value
        FROM joinforce_db.air_quality a
        JOIN joinforce_db.district d ON a.district_id = d.district_id
        GROUP BY d.district_name, a.measure
        ORDER BY d.district_name;
    """,

    "Worst Air Quality District": """
        SELECT d.district_name AS district_name, a.measure, a.data_value, a.air_quality
        FROM joinforce_db.air_quality a
        JOIN joinforce_db.district d ON a.district_id = d.district_id
        ORDER BY FIELD(a.air_quality, 'Unhealthy', 'Moderate', 'Good') ASC
        LIMIT 1;
    """
}

try:
    if conn.is_connected():
        cursor = conn.cursor()

        for query_name, sql_query in queries.items():
            cursor.execute(sql_query)
            rows = cursor.fetchall()

            # Get column names from cursor description
            column_names = [desc[0] for desc in cursor.description]

            # Create DataFrame
            df_result = pd.DataFrame(rows, columns=column_names)

            # Display DataFrame
            print(f"\n {query_name} Results:")
            print(df_result.head(10))  # Show first 10 rows

except Exception as e:
    print(f"Error while executing queries: {e}")

finally:
    cursor.close()
    conn.close()
    print("MySQL connection is closed.")


 District & Location Results:
   district_id district_name  longitude  latitude
0            1        Mumbai    72.8777   19.0760
1            2       Kolkata    88.3639   22.5726
2            3     Bengaluru    77.5946   12.9716
3            4       Chennai    80.2707   13.0827
4            5       Chennai    80.2707   13.0827
5            6         Delhi    77.1025   28.7041
6            7       Kolkata    88.3639   22.5726
7            8     Bengaluru    77.5946   12.9716
8            9     Bengaluru    77.5946   12.9716
9           10       Kolkata    88.3639   22.5726

 Air Quality Per District Results:
   district_id district_name        date          measure  data_value  \
0         1690         Delhi  2021-01-12  Ozone (O3, ppb)        24.0   
1         1690         Delhi  2021-01-09        SO2 (ppb)        48.0   
2         1690         Delhi  2021-01-08        NO2 (ppb)        10.0   
3         1690         Delhi  2020-01-12  Ozone (O3, ppb)        38.0   
4         1690    