In [1]:
# Import libraries
import zipfile
import os
from time import time
import pandas as pd
import pyarrow.parquet as pq
import pyarrow.dataset as ds
from sqlalchemy import create_engine

In [2]:
# Extract CSV files from zipped archive 
zip_path = '..\data\zipped.zip'
csv_path = '..\data\csv_files'

# Create a directory to extract the CSV files
os.makedirs(csv_path, exist_ok=True)

# Extract the ZIP file
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(csv_path)

**Convert CSV files to Parquet files**

In [3]:
parquet_path = '..\data\parquet_files'
os.makedirs(parquet_path, exist_ok=True)

def convert_csv_to_parquet(csv_file, parquet_file):
    df = pd.read_csv(csv_file)
    df.to_parquet(parquet_file)

# Iterate through extracted files 
for root, dirs, files in os.walk(csv_path):
    for file in files:
        if file.endswith('.csv'):
            csv_file_path = os.path.join(root, file)
            parquet_file_path = os.path.join(parquet_path, os.path.splitext(file)[0] + '.parquet')
            convert_csv_to_parquet(csv_file_path, parquet_file_path)

In [4]:
# List Parquet files 
parquet_files = [f for f in os.listdir(parquet_path) if f.endswith('.parquet')]

print("List of Parquet files:")
for parquet_file in parquet_files:
    print(parquet_file)

List of Parquet files:
benefits.parquet
companies.parquet
company_industries.parquet
company_specialities.parquet
employee_counts.parquet
industries.parquet
job_industries.parquet
job_postings.parquet
job_skills.parquet
salaries.parquet
skills.parquet


In [5]:
# Read the metadata for each Parquet file
for parquet_file in parquet_files:
    parquet_file_path = os.path.join(parquet_path, parquet_file)
    metadata = pq.read_metadata(parquet_file_path)
    print(f"Metadata for {parquet_file}:")
    print(metadata)
    print("\n")

Metadata for benefits.parquet:
<pyarrow._parquet.FileMetaData object at 0x000001D68BD96DB0>
  created_by: parquet-cpp-arrow version 15.0.2
  num_columns: 3
  num_rows: 29325
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 2391


Metadata for companies.parquet:
<pyarrow._parquet.FileMetaData object at 0x000001D694AAE4F0>
  created_by: parquet-cpp-arrow version 15.0.2
  num_columns: 10
  num_rows: 11361
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 8304


Metadata for company_industries.parquet:
<pyarrow._parquet.FileMetaData object at 0x000001D694AB9E00>
  created_by: parquet-cpp-arrow version 15.0.2
  num_columns: 2
  num_rows: 12601
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 2002


Metadata for company_specialities.parquet:
<pyarrow._parquet.FileMetaData object at 0x000001D694AAE4F0>
  created_by: parquet-cpp-arrow version 15.0.2
  num_columns: 2
  num_rows: 78405
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 1996


Metadata

In [6]:
# Check schema for each Parquet file
for parquet_file in parquet_files:
    parquet_file_path = os.path.join(parquet_path, parquet_file)
    file = pq.ParquetFile(parquet_file_path)
    table = file.read()
    print(f"Schema for {parquet_file}:")
    for field in table.schema:
        print(f"{field.name}: {field.type}")
    print("\n")

Schema for benefits.parquet:
job_id: int64
inferred: int64
type: string


Schema for companies.parquet:
company_id: int64
name: string
description: string
company_size: double
state: string
country: string
city: string
zip_code: string
address: string
url: string


Schema for company_industries.parquet:
company_id: int64
industry: string


Schema for company_specialities.parquet:
company_id: int64
speciality: string


Schema for employee_counts.parquet:
company_id: int64
employee_count: int64
follower_count: int64
time_recorded: double


Schema for industries.parquet:
industry_id: int64
industry_name: string


Schema for job_industries.parquet:
job_id: int64
industry_id: int64


Schema for job_postings.parquet:
job_id: int64
company_id: double
title: string
description: string
max_salary: double
med_salary: double
min_salary: double
pay_period: string
formatted_work_type: string
location: string
applies: double
original_listed_time: double
remote_allowed: double
views: double
job_posti

**Load data into PostgreSQL**

In [9]:
# Create a SQL database connection
engine = create_engine('postgresql://root:root@localhost:5432/ln_job_postings')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x1d694aa8d30>

In [11]:
t_start = time()

# Iterate over each Parquet file
for parquet_file in parquet_files:
    print(f"\nProcessing {parquet_file}...")
    
    # Load the Parquet file into a PyArrow Table
    table = pq.read_table(os.path.join(parquet_path, parquet_file))
    table_name = os.path.splitext(parquet_file)[0]

    # Convert the PyArrow Table to a pandas DataFrame
    df = table.to_pandas()
    
    # Insert the DataFrame into the database
    print(f"Inserting {parquet_file} into database...")
    df.to_sql(name=table_name, con=engine, index=False)
    print(f'Completed !')

t_end = time()   
print(f'Total time taken was {t_end-t_start:10.3f} seconds.')


Processing benefits.parquet...
Inserting benefits.parquet into database...
Completed !

Processing companies.parquet...
Inserting companies.parquet into database...
Completed !

Processing company_industries.parquet...
Inserting company_industries.parquet into database...
Completed !

Processing company_specialities.parquet...
Inserting company_specialities.parquet into database...
Completed !

Processing employee_counts.parquet...
Inserting employee_counts.parquet into database...
Completed !

Processing industries.parquet...
Inserting industries.parquet into database...
Completed !

Processing job_industries.parquet...
Inserting job_industries.parquet into database...
Completed !

Processing job_postings.parquet...
Inserting job_postings.parquet into database...
Completed !

Processing job_skills.parquet...
Inserting job_skills.parquet into database...
Completed !

Processing salaries.parquet...
Inserting salaries.parquet into database...
Completed !

Processing skills.parquet...
In