## **PROJECT - NOTEBOOK #1: Raw Data**

> 🚧 Run this notebook only once to migrate the data to your DB (It will take a few minutes)

This notebook sets up our data pipeline by configuring the environment, importing essential libraries and create a SQLAlchemy engine, then loading raw data from the CSV file into a Pandas DataFrame, transferring this data into a MySQL database, and verifying the transfer with a simple query.

---

### **Setting Environment**

In [1]:
import os 
print(os.getcwd())

try:
    os.chdir("../../project_etl")

except FileNotFoundError:
    print("""
        FileNotFoundError - The directory may not exist or you might not be in the specified path.
        If this has already worked, do not run this block again, as the current directory is already set to project_etl.
        """)
    
print(os.getcwd())

d:\U\FIFTH SEMESTER\ETL\project_etl\notebooks
d:\U\FIFTH SEMESTER\ETL\project_etl


### **Importing modules and libraries**

In [2]:
import pandas as pd
import logging
from sqlalchemy import text
from src.database.db_connection import create_gcp_engine

In [3]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

### **Ingest Data**

In [4]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("arshkon/linkedin-job-postings")
print("Path to dataset files:", path)

dataset_dir = path

dfs = {}

csv_files = {
    'jobs': os.path.join(dataset_dir, "postings.csv"),
    'benefits': os.path.join(dataset_dir, "jobs", "benefits.csv"),
    'companies': os.path.join(dataset_dir, "companies", "companies.csv"),
    'employee_counts': os.path.join(dataset_dir, "companies", "employee_counts.csv"), 
    
    #Added CSV files besides the main ones in order to create the fact table
    'industries': os.path.join(dataset_dir, "mappings", "industries.csv"), 
    'skills_industries': os.path.join(dataset_dir, "mappings", "skills.csv"),
    'salaries': os.path.join(dataset_dir, "jobs", "salaries.csv"),
}

for name, file_path in csv_files.items():
    if os.path.exists(file_path):
        dfs[name] = pd.read_csv(file_path)
        logger.info(f"Loaded {name} from {file_path} with {len(dfs[name])} rows")
        print(f"\nDataFrame '{name}' loaded successfully!")
        print(f"First 5 rows of '{name}':")
        print(dfs[name].head())
    else:
        logger.error(f"Error: '{file_path}' not found")
        print(f"Error: '{file_path}' not found in {dataset_dir}")
        print("Available files:", os.listdir(dataset_dir if 'jobs' not in file_path else os.path.join(dataset_dir, "jobs")))

#Access to a specific Dataframe
if 'postings' in dfs:
    print("\nColumns in postings DataFrame:")
    print(dfs['postings'].columns)

  from .autonotebook import tqdm as notebook_tqdm


Path to dataset files: C:\Users\sebas\.cache\kagglehub\datasets\arshkon\linkedin-job-postings\versions\13


INFO:__main__:Loaded jobs from C:\Users\sebas\.cache\kagglehub\datasets\arshkon\linkedin-job-postings\versions\13\postings.csv with 123849 rows
INFO:__main__:Loaded benefits from C:\Users\sebas\.cache\kagglehub\datasets\arshkon\linkedin-job-postings\versions\13\jobs\benefits.csv with 67943 rows



DataFrame 'jobs' loaded successfully!
First 5 rows of 'jobs':
     job_id            company_name  \
0    921716   Corcoran Sawyer Smith   
1   1829192                     NaN   
2  10998357  The National Exemplar    
3  23221523  Abrams Fensterman, LLP   
4  35982263                     NaN   

                                               title  \
0                              Marketing Coordinator   
1                  Mental Health Therapist/Counselor   
2                        Assitant Restaurant Manager   
3  Senior Elder Law / Trusts and Estates Associat...   
4                                 Service Technician   

                                         description  max_salary pay_period  \
0  Job descriptionA leading real estate firm in N...        20.0     HOURLY   
1  At Aspen Therapy and Wellness , we are committ...        50.0     HOURLY   
2  The National Exemplar is accepting application...     65000.0     YEARLY   
3  Senior Associate Attorney - Elder Law / Trusts

INFO:__main__:Loaded companies from C:\Users\sebas\.cache\kagglehub\datasets\arshkon\linkedin-job-postings\versions\13\companies\companies.csv with 24473 rows
INFO:__main__:Loaded employee_counts from C:\Users\sebas\.cache\kagglehub\datasets\arshkon\linkedin-job-postings\versions\13\companies\employee_counts.csv with 35787 rows
INFO:__main__:Loaded industries from C:\Users\sebas\.cache\kagglehub\datasets\arshkon\linkedin-job-postings\versions\13\mappings\industries.csv with 422 rows
INFO:__main__:Loaded skills_industries from C:\Users\sebas\.cache\kagglehub\datasets\arshkon\linkedin-job-postings\versions\13\mappings\skills.csv with 35 rows
INFO:__main__:Loaded salaries from C:\Users\sebas\.cache\kagglehub\datasets\arshkon\linkedin-job-postings\versions\13\jobs\salaries.csv with 40785 rows



DataFrame 'companies' loaded successfully!
First 5 rows of 'companies':
   company_id                        name  \
0        1009                         IBM   
1        1016               GE HealthCare   
2        1025  Hewlett Packard Enterprise   
3        1028                      Oracle   
4        1033                   Accenture   

                                         description  company_size  state  \
0  At IBM, we do more than work. We create. We cr...           7.0     NY   
1  Every day millions of people feel the impact o...           7.0      0   
2  Official LinkedIn of Hewlett Packard Enterpris...           7.0  Texas   
3  We’re a cloud technology company that provides...           7.0  Texas   
4  Accenture is a leading global professional ser...           7.0      0   

  country              city zip_code                                address  \
0      US  Armonk, New York    10504  International Business Machines Corp.   
1      US           Chicago        

### **Transfer data to DB in GCP Cloud SQL (raw schema)**

In [5]:
try:
    engine = create_gcp_engine()
    logger.info("Successfully connected to GCP database")
except Exception as e:
    logger.error(f"Failed to connect to GCP database: {str(e)}")
    raise

INFO:src.database.db_connection:Successfully created GCP database engine
INFO:__main__:Successfully connected to GCP database


In [7]:
try:
    with engine.connect() as connection:
        result = connection.execute(text('SELECT version();'))
        logger.info(f'PostgreSQL Version: {result.fetchone()[0]}')
except Exception as e:
    logger.error(f'Connection verification failed: {e}')
    raise

#Load each DataFrame into the 'raw' schema
for name, df in dfs.items():
    try:
        with engine.begin() as connection:
            df.to_sql(name.lower(), con=connection, schema='raw', if_exists='replace', index=False)
            logger.info(f"Table '{name}' successfully loaded into raw schema!")
            
            # Validate
            result = connection.execute(text(f"SELECT COUNT(*) FROM raw.{name}"))
            row_count = result.fetchone()[0]
            logger.info(f"Validation: raw.{name} has {row_count} rows")
    except Exception as e:
        logger.error(f"Error loading '{name}' into raw schema: {e}")
        raise

#Verify all tables in the raw schema
try:
    with engine.connect() as connection:
        result = connection.execute(text("SELECT table_name FROM information_schema.tables WHERE table_schema = 'raw';"))
        logger.info('\nTables in raw schema of project-etl database:')
        for row in result:
            logger.info(row[0])
except Exception as e:
    logger.error(f'Verification failed: {e}')
    raise


INFO:__main__:PostgreSQL Version: PostgreSQL 16.8 on x86_64-pc-linux-gnu, compiled by Debian clang version 12.0.1, 64-bit
INFO:__main__:Table 'jobs' successfully loaded into raw schema!
INFO:__main__:Validation: raw.jobs has 123849 rows
INFO:__main__:Table 'benefits' successfully loaded into raw schema!
INFO:__main__:Validation: raw.benefits has 67943 rows
INFO:__main__:Table 'companies' successfully loaded into raw schema!
INFO:__main__:Validation: raw.companies has 24473 rows
INFO:__main__:Table 'employee_counts' successfully loaded into raw schema!
INFO:__main__:Validation: raw.employee_counts has 35787 rows
INFO:__main__:Table 'industries' successfully loaded into raw schema!
INFO:__main__:Validation: raw.industries has 422 rows
INFO:__main__:Table 'skills_industries' successfully loaded into raw schema!
INFO:__main__:Validation: raw.skills_industries has 35 rows
INFO:__main__:Table 'salaries' successfully loaded into raw schema!
INFO:__main__:Validation: raw.salaries has 40785 row

In [None]:
# Close the engine
engine.dispose()
logger.info("Closed connection to GCP database.")