In [4]:
!pip install sqlalchemy psycopg2-binary

Defaulting to user installation because normal site-packages is not writeable
Collecting sqlalchemy
  Downloading sqlalchemy-2.0.46-cp312-cp312-win_amd64.whl.metadata (9.8 kB)
Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.11-cp312-cp312-win_amd64.whl.metadata (5.1 kB)
Collecting greenlet>=1 (from sqlalchemy)
  Downloading greenlet-3.3.1-cp312-cp312-win_amd64.whl.metadata (3.8 kB)
Downloading sqlalchemy-2.0.46-cp312-cp312-win_amd64.whl (2.1 MB)
   ---------------------------------------- 0.0/2.1 MB ? eta -:--:--
   --------- ------------------------------ 0.5/2.1 MB 8.2 MB/s eta 0:00:01
   ----------------------------- ---------- 1.6/2.1 MB 5.2 MB/s eta 0:00:01
   ---------------------------------------- 2.1/2.1 MB 4.5 MB/s  0:00:00
Downloading psycopg2_binary-2.9.11-cp312-cp312-win_amd64.whl (2.7 MB)
   ---------------------------------------- 0.0/2.7 MB ? eta -:--:--
   ----------- ---------------------------- 0.8/2.7 MB 4.2 MB/s eta 0:00:01
   ----------------------- -


[notice] A new release of pip is available: 25.3 -> 26.0
[notice] To update, run: python.exe -m pip install --upgrade pip


In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
import pandas as pd
from sqlalchemy import create_engine

In [42]:
print(" Starting ETL: CSV ‚Üí PostgreSQL\n")

 Starting ETL: CSV ‚Üí PostgreSQL



In [43]:
CSV_FOLDER = r"C:\Users\Vivobook\ready_for_etl_dataframes"

In [44]:
POSTGRES_URL = "postgresql://postgres:12345@localhost:5432/tourism_db"

In [45]:
CSV_TO_TABLE_MAPPING = {
    'df_regions.csv': 'dim_regions',
    'df_main.csv': 'fact_pois_table',
    'df_locations.csv': 'dim_locations',
    'df_theme.csv': 'dim_themes',
    'df_types.csv': 'dim_types',
    'df_reviews.csv': 'dim_reviews',
    'df_amenity.csv': 'dim_amenities',
    'df_images.csv': 'dim_images',
    'df_contacts.csv': 'dim_contacts',
    'df_descriptions.csv': 'dim_descriptions',
    'df_creators.csv': 'dim_creators',
    'df_poi_classes.csv': 'dim_poi_classes',
    'df_covid.csv': 'dim_covid_status'}

In [46]:
csv_path = Path(CSV_FOLDER)
print(" LOADING CSV FILES")
print("="*80 + "\n")

dataframes = {}

for csv_file, table_name in CSV_TO_TABLE_MAPPING.items():
    csv_file_path = csv_path / csv_file
    
    if csv_file_path.exists():
        print(f" Reading: {csv_file}")
        print(f"   Target table: {table_name}")
        try:
            df = pd.read_csv(csv_file_path)
            
            # Store DataFrame
            dataframes[table_name] = df
            
            # Print summary
            print(f"    Loaded: {len(df):,} rows √ó {len(df.columns)} columns")
            
            # Print all column names
            print(f"  Columns ({len(df.columns)}):")
            for i, col in enumerate(df.columns, 1):
                # Print in a nice format, 3 columns per line
                if i % 3 == 1:
                    print(f"      ", end="")
                print(f"{col:<30}", end="")
                if i % 3 == 0 or i == len(df.columns):
                    print()  # New line
            
            # Show sample of first row (optional)
            print(f"    Sample data (first row):")
            for col in df.columns[:5]:  # Show first 5 columns only
                value = df[col].iloc[0] if len(df) > 0 else "N/A"
                # Truncate long values
                value_str = str(value)[:50] + "..." if len(str(value)) > 50 else str(value)
                print(f"      {col}: {value_str}")
            if len(df.columns) > 5:
                print(f"      ... and {len(df.columns) - 5} more columns")
            
            print()  # Empty line between files
        
        except Exception as e:
            print(f" Error reading {csv_file}: {e}\n")
    else:
        print(f" {csv_file} not found, skipping...\n")

if not dataframes:
    print("\n" + "="*80)
    print(" NO CSV FILES WERE LOADED!")
    print("="*80)
    print("\nPlease check:")
    print(f"  1. Folder path: {CSV_FOLDER}")
    print(f"  2. CSV files exist in that folder")
    print(f"  3. CSV file names match the mapping:")
    for csv_file in CSV_TO_TABLE_MAPPING.keys():
        print(f"     ‚Ä¢ {csv_file}")
    exit(1)


 LOADING CSV FILES

 Reading: df_regions.csv
   Target table: dim_regions
    Loaded: 476,506 rows √ó 6 columns
  Columns (6):
      uuid                          latitude                      longitude                     
      postal_code                   city                          region_name                   
    Sample data (first row):
      uuid: 000016ad-998e-3b6a-80b7-25531604fc11
      latitude: 45.233402
      longitude: 6.722158
      postal_code: 73500
      city: Avrieux
      ... and 1 more columns

 Reading: df_main.csv
   Target table: fact_pois_table
    Loaded: 476,528 rows √ó 4 columns
  Columns (4):
      uuid                          label_fr                      label_en                      
      last_update_datatourisme      
    Sample data (first row):
      uuid: 000016ad-998e-3b6a-80b7-25531604fc11
      label_fr: T√©l√©graphe Chappe de Courberon
      label_en: Chappe de Courberon telegraph
      last_update_datatourisme: 2026-02-05 07:10:58.046000+

  df = pd.read_csv(csv_file_path)


    Loaded: 544,145 rows √ó 8 columns
  Columns (8):
      uuid                          contact_id                    legal_name                    
      email                         homepage                      phone                         
      street_address                postal_code                   
    Sample data (first row):
      uuid: 000016ad-998e-3b6a-80b7-25531604fc11
      contact_id: False
      legal_name: nan
      email: marie-annick.blondon@wanadoo.fr
      homepage: https://avrieux.com/telegraphe-chappe/
      ... and 3 more columns

 Reading: df_descriptions.csv
   Target table: dim_descriptions
    Loaded: 438,862 rows √ó 5 columns
  Columns (5):
      uuid                          description_fr                description_en                
      short_description_fr          short_description_en          
    Sample data (first row):
      uuid: 000016ad-998e-3b6a-80b7-25531604fc11
      description_fr: nan
      description_en: nan
      short_descripti

In [47]:
# Create variables for each dataframe
df_regions = dataframes.get('dim_regions')
df_amenity = dataframes.get('dim_amenities')
df_contacts = dataframes.get('dim_contacts')
df_covid = dataframes.get('dim_covid_status')
df_creators = dataframes.get('dim_creators')
df_descriptions = dataframes.get('dim_descriptions')
df_images = dataframes.get('dim_images')
df_main = dataframes.get('fact_pois_table')
df_poi_classes = dataframes.get('dim_poi_classes')
df_reviews = dataframes.get('dim_reviews')
df_theme = dataframes.get('dim_themes')
df_types = dataframes.get('dim_types')


In [48]:
df_regions.head(3)

Unnamed: 0,uuid,latitude,longitude,postal_code,city,region_name
0,000016ad-998e-3b6a-80b7-25531604fc11,45.233402,6.722158,73500,Avrieux,Auvergne-Rh√¥ne-Alpes
1,000018f6-c7e6-3762-9713-5595c558c900,44.57023,0.44739,47800,Montignac-de-Lauzun,Nouvelle-Aquitaine
2,000035ae-b4f4-3ced-9d4d-23287800fd9e,43.7609,2.9526,12360,Brusque,Occitanie


In [49]:
from sqlalchemy import create_engine, text
from sqlalchemy.exc import ProgrammingError
import pandas as pd

# PostgreSQL connection
POSTGRES_USER = 'postgres'
POSTGRES_PASSWORD = '12345'
POSTGRES_HOST = 'localhost'
POSTGRES_PORT = '5432'
DB_NAME = 'tourism_db'

# Connect to default postgres database
default_url = f'postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/postgres'
engine_default = create_engine(default_url, isolation_level='AUTOCOMMIT')

try:
    with engine_default.connect() as conn:
        # Check if database exists
        result = conn.execute(text(
            f"SELECT 1 FROM pg_database WHERE datname = '{DB_NAME}'"
        ))
        
        if result.fetchone() is None:
            # Database doesn't exist, create it
            conn.execute(text(f"CREATE DATABASE {DB_NAME}"))
            print(f" Created database: {DB_NAME}\n")
        else:
            print(f" Database already exists: {DB_NAME}\n")
            
except Exception as e:
    print(f" Error: {e}\n")
finally:
    engine_default.dispose()
    
# CONNECT TO TOURISM_DB AND LOAD DATA

POSTGRES_URL = f'postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{DB_NAME}'

try:
    # Connect to PostgreSQL
    engine = create_engine(POSTGRES_URL)
    
    # Test connection
    with engine.connect() as conn:
        print(" Connected to PostgreSQL\n")
    
    # Prepare all tables to load
    star_schema_tables = {
        'dim_regions': df_regions,
        'fact_pois_table': df_main,  # ‚Üê Changed from fact_pois
        'dim_amenities': df_amenity,  # ‚Üê Changed naming
        'dim_contacts': df_contacts,
        'dim_covid_status': df_covid,
        'dim_creators': df_creators,
        'dim_descriptions': df_descriptions,
        'dim_images': df_images,
        'dim_poi_classes': df_poi_classes,
        'dim_reviews': df_reviews,
        'dim_themes': df_theme,
        'dim_types': df_types
    }
    
    # Load in order
    load_order = [
        'dim_regions',
        'fact_pois_table',
        'dim_amenities',
        'dim_contacts',
        'dim_themes',
        'dim_types',
        'dim_reviews',
        'dim_images',
        'dim_descriptions',
        'dim_creators',
        'dim_poi_classes',
        'dim_covid_status'
    ]
    
    total_rows = 0
    loaded_tables = []
    
    print("Loading tables to PostgreSQL...\n")
    
    for table_name in load_order:
        df_table = star_schema_tables.get(table_name)
        
        if df_table is not None and not df_table.empty:
            print(f"   Loading {table_name}...", end=' ')
            try:
                df_table.to_sql(
                    table_name,
                    engine,
                    if_exists='replace',
                    index=False,
                    chunksize=10000,
                    method='multi'
                )
                total_rows += len(df_table)
                loaded_tables.append((table_name, len(df_table)))
                print(f" {len(df_table):,} rows")
            except Exception as e:
                print(f"Error: {e}")
        else:
            print(f"  Skipping {table_name} (no data)")
    
    print("ETL COMPLETE - STAR SCHEMA IN POSTGRESQL!")
 
    
    # Summary
    print(" Summary of Loaded Tables:\n")
    print(f"   {'Table Name':<30} {'Rows':>15}")
    print(f"   {'-'*30} {'-'*15}")
    for table_name, row_count in loaded_tables:
        print(f"   {table_name:<30} {row_count:>15,}")
    print(f"   {'-'*30} {'-'*15}")
    print(f"   {'TOTAL':<30} {total_rows:>15,}\n")
    
    # Verify
    print(" Verifying database...")
    tables_in_db = pd.read_sql(
        "SELECT table_name FROM information_schema.tables WHERE table_schema='public' ORDER BY table_name",
        engine
    )
    print(f" {len(tables_in_db)} tables in database:")
    for table in tables_in_db['table_name']:
        print(f"      ‚Ä¢ {table}")
    
except Exception as e:
    print(f"\n ERROR: {e}")
    print("\nüîß Troubleshooting:")
    print("   1. Is PostgreSQL running?")

finally:
    if 'engine' in locals():
        engine.dispose()

print("\n Done!")

üîß Checking if database exists...

‚úÖ Database already exists: tourism_db

‚úÖ Connected to PostgreSQL

üì§ Loading tables to PostgreSQL...

   Loading dim_regions... ‚úÖ 476,506 rows
   Loading fact_pois_table... ‚úÖ 476,528 rows
   Loading dim_amenities... ‚úÖ 3,512 rows
   Loading dim_contacts... ‚úÖ 544,145 rows
   Loading dim_themes... ‚úÖ 3,631 rows
   Loading dim_types... ‚úÖ 2,497,258 rows
   Loading dim_reviews... ‚úÖ 142,426 rows
   Loading dim_images... ‚úÖ 151,269 rows
   Loading dim_descriptions... ‚úÖ 438,862 rows
   Loading dim_creators... ‚úÖ 476,455 rows
   Loading dim_poi_classes... ‚úÖ 2,141,626 rows
   Loading dim_covid_status... ‚úÖ 5,307 rows

‚úÖ ETL COMPLETE - STAR SCHEMA IN POSTGRESQL!

üìä Summary of Loaded Tables:

   Table Name                                Rows
   ------------------------------ ---------------
   dim_regions                            476,506
   fact_pois_table                        476,528
   dim_amenities                           