In [31]:
import pandas as pd
import os
import numpy as np
parquet_path = '../local_data/gold/gold_program_engagement/part-00000-tid-1795281215647571771-4d650787-b0f2-405a-9083-71b2428618df-4619-1-c000.snappy.parquet'
assert os.path.exists(parquet_path), f"File not found: {parquet_path}"

# Load Parquet file into DataFrame
df_programs = pd.read_parquet(parquet_path, engine='pyarrow')

print(df_programs.head())


                                                name  \
0          Renewable Energies and Energy Efficiency    
1  Civil Engineering with Second Major in Entrepr...   
2  Mechanical Engineering with Second Major in En...   
3  Aerospace Engineering with Second Major in Ent...   
4  Natural Sciences at the Faculty of Environment...   

                 program_id              institute_id  \
0  67cee87f6400e7152f7c15db  67c5b84d8e2c2e341bf3fab8   
1  66dac376625be65e46d8905d  66d823c2c6e125955aa84c48   
2  66deaaa5625be65e46f28b4a  66d823c2c6e125955aa84c48   
3  66dea9a1625be65e46f2781a  66d823c2c6e125955aa84c48   
4  668849f6832f8e31a77b65f7  6681a3b9832f8e31a753bf44   

                     institute_name    countries  program_duration  tuition  \
0            University of Zaragoza      [Spain]                48  1378.72   
1  Nanyang Technological University  [Singapore]                48  5262.50   
2  Nanyang Technological University  [Singapore]                48  5262.50   
3  N

In [92]:
import psycopg2
from dotenv import load_dotenv
import os

# Load env vars
load_dotenv()

conn = psycopg2.connect(
    host=os.getenv("PG_HOST"),
    port=os.getenv("PG_PORT"),
    user=os.getenv("PG_USER"),
    password=os.getenv("PG_PASSWORD"),
    dbname=os.getenv("PG_DB")
)

cur = conn.cursor()

In [68]:
conn.rollback()

In [25]:
with conn.cursor() as cur:
    # Fetch institution_id and institution_name
    cur.execute("SELECT institution_id, institution_name FROM institutions;")
    institution_map = {name: inst_id for inst_id, name in cur.fetchall()}
    
    # Fetch country_id and institute_country
    cur.execute("SELECT country_id, institute_country FROM countries;")
    country_map = {country: cid for cid, country in cur.fetchall()}

print("Institution mapping sample:", list(institution_map.items())[:3])
print("Country mapping sample:", list(country_map.items())[:3])

Institution mapping sample: [('University of Helsinki', 92), ('VAMIA', 93), ('Univacity', 94)]
Country mapping sample: [('United Kingdom', 1), ('Portugal', 2), ('Turkey', 3)]


In [34]:
import numpy as np

# Extract institutions (this is fine)
institutions_in_df = set(df_programs['institute_name'].unique())

# Extract countries manually since 'countries' column holds lists/arrays
countries_in_df = set()
for cell in df_programs['countries']:
    # If the cell is list-like, get the first element
    if isinstance(cell, (list, tuple, np.ndarray)):
        if len(cell) > 0:
            countries_in_df.add(cell[0])
    else:
        countries_in_df.add(cell)

print("Institutions in DF:", list(institutions_in_df)[:5])
print("Countries in DF:", list(countries_in_df)[:5])

Institutions in DF: ['Schiller International University', 'Romanian-American University', 'Nanjing Vocational University of Industry Technology', 'Politecnico di Milano', 'Centennial College']
Countries in DF: ['Hungary', 'Austria', 'Algeria', 'Canada', None]


In [35]:
# Convert your institution_map and country_map keys to sets for easy lookup
existing_institutions = set(institution_map.keys())
existing_countries = set(country_map.keys())

# Find missing institutions and countries
missing_institutions = [inst for inst in institutions_in_df if inst not in existing_institutions]
missing_countries = [c for c in countries_in_df if c is not None and c not in existing_countries]

print("Missing institutions:", missing_institutions)
print("Missing countries:", missing_countries)

Missing institutions: ['Nanjing Vocational University of Industry Technology', 'University of Graz', 'University of Twente', 'University of Glasgow', 'Vilnius University', 'European University of Tirana', 'Kaunas University of Technology', 'University of Alicante', 'Pompeu Fabra University', 'University of Salamanca', 'University of Manitoba', 'Alexandru Ioan Cuza University of Iasi (UAIC)', 'Nanyang Technological University', 'University of Granada', 'Lunex University', 'Tampere University', 'Blanquerna - Universitat Ramon Llull (URL)']
Missing countries: ['Algeria', 'Lithuania', 'Singapore', 'Albania']


In [40]:
with conn.cursor() as cur:
    cur.execute("SELECT setval('countries_country_id_seq', (SELECT MAX(country_id) FROM countries));")
    conn.commit()

In [41]:
from datetime import datetime

insert_country_sql = """
INSERT INTO countries (institute_country, created_at, updated_at)
VALUES (%s, %s, %s)
RETURNING country_id;
"""

now = datetime.utcnow()
with conn.cursor() as cur:
    for country in missing_countries:
        # Insert new country and get the new ID
        cur.execute(insert_country_sql, (country, now, now))
        new_id = cur.fetchone()[0]
        country_map[country] = new_id
    conn.commit()

In [43]:
import numpy as np
from datetime import datetime

insert_institution_sql = """
INSERT INTO institutions (institution_name, country_id, created_at, updated_at)
VALUES (%s, %s, %s, %s)
RETURNING institution_id;
"""

now = datetime.utcnow()

with conn.cursor() as cur:
    for institution in missing_institutions:
        country_list = df_programs.loc[df_programs['institute_name'] == institution, 'countries'].values
        
        if len(country_list) > 0:
            raw_country = country_list[0]
            # Extract country name if it's a list or ndarray
            if isinstance(raw_country, (list, np.ndarray)):
                country_name = raw_country[0] if len(raw_country) > 0 else None
            else:
                country_name = raw_country
        else:
            country_name = None
        
        country_id = country_map.get(country_name, None)
        
        cur.execute(insert_institution_sql, (institution, country_id, now, now))
        new_inst_id = cur.fetchone()[0]
        institution_map[institution] = new_inst_id

conn.commit()

In [45]:
df_programs['program_id'] = pd.to_numeric(df_programs['program_id'], errors='coerce').astype('Int64')
df_programs['institute_id'] = pd.to_numeric(df_programs['institute_id'], errors='coerce').astype('Int64')

# Check for any nulls that appeared due to conversion errors
print("Nulls in program_id after conversion:", df_programs['program_id'].isnull().sum())
print("Nulls in institute_id after conversion:", df_programs['institute_id'].isnull().sum())

Nulls in program_id after conversion: 5977
Nulls in institute_id after conversion: 5977


In [47]:
with conn.cursor() as cur:
    cur.execute("""
        SELECT column_name, data_type 
        FROM information_schema.columns 
        WHERE table_name = 'programs' 
          AND column_name IN ('program_id', 'institution_id');
    """)
    results = cur.fetchall()
    for column_name, data_type in results:
        print(f"{column_name}: {data_type}")

institution_id: integer
program_id: integer


In [48]:
# Assuming you already have institution_map like: {'University of Helsinki': 92, ...}

# Create a new column institution_id by mapping institute_name using institution_map
df_programs['institution_id_mapped'] = df_programs['institute_name'].map(institution_map)

# Check how many nulls (missing mappings) you get
null_count = df_programs['institution_id_mapped'].isnull().sum()
print(f"Nulls in institution_id after mapping: {null_count}")

# Preview the mapping result
print(df_programs[['institute_name', 'institution_id_mapped']].head())

Nulls in institution_id after mapping: 0
                     institute_name  institution_id_mapped
0            University of Zaragoza                    178
1  Nanyang Technological University                    202
2  Nanyang Technological University                    202
3  Nanyang Technological University                    202
4                University of Graz                    191


In [49]:
# Extract first country from the 'countries' list column
df_programs['country_name'] = df_programs['countries'].apply(lambda x: x[0] if isinstance(x, list) and len(x) > 0 else None)

# Map country names to country IDs using the country_map dictionary
df_programs['country_id_mapped'] = df_programs['country_name'].map(country_map)

# Check for nulls after mapping countries
null_country_count = df_programs['country_id_mapped'].isnull().sum()
print(f"Nulls in country_id after mapping: {null_country_count}")

# Preview the result
print(df_programs[['country_name', 'country_id_mapped']].head())

Nulls in country_id after mapping: 5977
  country_name  country_id_mapped
0         None                NaN
1         None                NaN
2         None                NaN
3         None                NaN
4         None                NaN


In [50]:
print(df_programs['countries'].head(10))

0             [Spain]
1         [Singapore]
2         [Singapore]
3         [Singapore]
4           [Austria]
5    [United Kingdom]
6         [Singapore]
7         [Singapore]
8    [United Kingdom]
9    [United Kingdom]
Name: countries, dtype: object


In [51]:
empty_or_none = df_programs['countries'].apply(lambda x: (x is None) or (len(x) == 0)).sum()
print(f"Rows with None or empty country list: {empty_or_none}")

Rows with None or empty country list: 8


In [58]:
import numpy as np

def extract_country_name(country_arr):
    if (isinstance(country_arr, list) or isinstance(country_arr, np.ndarray)) and len(country_arr) > 0:
        return country_arr[0]
    else:
        return None

df_programs['country_name'] = df_programs['countries'].apply(extract_country_name)

In [59]:
df_programs['country_id_mapped'] = df_programs['country_name'].map(country_map)

print(f"Nulls in country_id after mapping: {df_programs['country_id_mapped'].isna().sum()}")

Nulls in country_id after mapping: 8


In [60]:
missing_countries_df = df_programs[df_programs['country_id_mapped'].isnull()][['countries', 'country_name']]
print(missing_countries_df.head(10))

     countries country_name
701       None         None
4276      None         None
4605      None         None
4671      None         None
4859      None         None
5145      None         None
5693      None         None
5912      None         None


In [62]:
print(df_programs.dtypes)

name                      object
program_id                 Int64
institute_id               Int64
institute_name            object
countries                 object
program_duration           int64
tuition                  float64
total_views                int64
total_impressions          int64
avg_ctr                  float64
rank_by_ctr                int32
institution_id_mapped      int64
country_name              object
country_id_mapped        float64
dtype: object


In [63]:
df_programs.rename(columns={
    'name': 'program_name',
    'program_duration': 'duration_months',
    'avg_ctr': 'ctr',
    'institution_id_mapped': 'institution_id',
    'country_id_mapped': 'country_id'
}, inplace=True)

In [64]:
df_programs = df_programs.dropna(subset=['country_id'])  
df_programs['country_id'] = df_programs['country_id'].astype(int)

In [65]:
from datetime import datetime
now = datetime.utcnow()
df_programs['created_at'] = now
df_programs['updated_at'] = now

In [81]:
conn.rollback()

In [85]:
from psycopg2.extras import execute_batch

insert_sql = """
INSERT INTO programs (
    program_name, institution_id, country_id, duration_months,
    tuition, total_views, total_impressions, ctr,
    created_at, updated_at
) VALUES (
    %(program_name)s, %(institution_id)s, %(country_id)s, %(duration_months)s,
    %(tuition)s, %(total_views)s, %(total_impressions)s, %(ctr)s,
    %(created_at)s, %(updated_at)s
)
"""

df_to_insert = df_programs[[
    'program_name', 'institution_id', 'country_id', 'duration_months',
    'tuition', 'total_views', 'total_impressions', 'ctr',
    'created_at', 'updated_at'
]].copy()

df_to_insert = df_to_insert.dropna(subset=['program_name', 'institution_id', 'country_id'])

data_to_insert = df_to_insert.to_dict(orient='records')

with conn.cursor() as cur:
    execute_batch(cur, insert_sql, data_to_insert)
    conn.commit()

In [91]:
with conn.cursor() as cur:
    cur.execute("SELECT program_id, program_name, institution_id, country_id, tuition FROM programs LIMIT 5;")
    rows = cur.fetchall()
    for row in rows:
        print(row)

(1, ' Renewable Energies and Energy Efficiency ', 178, 8, Decimal('1378.72'))
(2, 'Civil Engineering with Second Major in Entrepreneurship', 202, 32, Decimal('5262.50'))
(3, 'Mechanical Engineering with Second Major in Entrepreneurship', 202, 32, Decimal('5262.50'))
(4, 'Aerospace Engineering with Second Major in Entrepreneurship\u200b', 202, 32, Decimal('5262.50'))
(5, 'Natural Sciences at the Faculty of Environmental, Regional and Educational Sciences', 191, 11, Decimal('363.36'))
