In [1]:
pip install pyspark pandas sqlalchemy psycopg2-binary


Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.9-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.4 kB)
Downloading psycopg2_binary-2.9.9-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m74.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=cec1352250dfa5ff9e119ae929d7dbbe4782a0a08f7dd85ad23575ed453be133
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully 

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os

# Initialize Spark session
spark = SparkSession.builder.appName("DP - Countries").getOrCreate()

In [3]:
import os

# Define the directory path
directory_path = 'dataset/countries/'

# Create the directory if it doesn't exist
os.makedirs(directory_path)

In [5]:
# Define the path to the dataset
csv_file = '/content/dataset/countries/countries of the world.csv'

# Read the CSV file into a Spark DataFrame
df = spark.read.csv(csv_file, header=True, inferSchema=True)

In [6]:
schema = StructType([
    StructField("Country", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("Population", IntegerType(), True),
    StructField("Area (sq. mi.)", DoubleType(), True),
    StructField("Pop. Density (per sq. mi.)", DoubleType(), True),
    StructField("Coastline (coast/area ratio)", DoubleType(), True),
    StructField("Net migration", DoubleType(), True),
    StructField("Infant mortality (per 1000 births)", DoubleType(), True),
    StructField("GDP ($ per capita)", IntegerType(), True),
    StructField("Literacy (%)", DoubleType(), True),
    StructField("Phones (per 1000)", DoubleType(), True),
    StructField("Arable (%)", DoubleType(), True),
    StructField("Crops (%)", DoubleType(), True),
    StructField("Other (%)", DoubleType(), True),
    StructField("Climate", DoubleType(), True),
    StructField("Birthrate", DoubleType(), True),
    StructField("Deathrate", DoubleType(), True),
    StructField("Agriculture", DoubleType(), True),
    StructField("Industry", DoubleType(), True),
    StructField("Service", DoubleType(), True)
])

In [7]:
# Example of casting columns to specific types
df = df.withColumn("Population", col("Population").cast(IntegerType())) \
       .withColumn("GDP ($ per capita)", col("GDP ($ per capita)").cast(IntegerType())) \
       .withColumn("Literacy (%)", col("Literacy (%)").cast(DoubleType()))

# Further transformations if needed
df = df.na.fill(0)  # Replace nulls with 0 for numerical columns

In [8]:
output_path = './datasets/countries/countries_data.parquet'
df.write.parquet(output_path)


In [9]:
# If running locally
!apt update
!apt install -y postgresql
!/etc/init.d/postgresql start
!sudo -u postgres psql -c "CREATE DATABASE countries_db;"
!sudo -u postgres psql -c "CREATE USER developer WITH PASSWORD 'test';"
!sudo -u postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE countries_db TO developer;"

[33m0% [Working][0m            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Ign:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy Release [5,713 B]
Hit:8 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Get:9 https://r2u.stat.illinois.edu/ubuntu jammy Release.gpg [793 B]
Get:10 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Get:11 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [2,173 kB]
Hit:12 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:13 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy 

In [10]:
import psycopg2
import pandas as pd
from sqlalchemy import create_engine

# Define connection parameters
host = "localhost"
port = 5432
dbname = "countries_db"
user = "developer"
password = "test"

# Create a connection string
conn_str = f'postgresql://{user}:{password}@{host}:{port}/{dbname}'

# Create SQLAlchemy engine
engine = create_engine(conn_str)

# Convert Spark DataFrame to Pandas DataFrame
df_pandas = df.toPandas()

# Write DataFrame to PostgreSQL table
df_pandas.to_sql('countries', engine, if_exists='replace', index=False)

print("Data written to PostgreSQL table successfully.")


# # Query to fetch data
query = "SELECT * FROM countries;"

# # Fetch data into a DataFrame
df_query = pd.read_sql(query, engine)

# # Display the DataFrame
print(df_query)

Data written to PostgreSQL table successfully.
             Country                               Region  Population  \
0       Afghanistan         ASIA (EX. NEAR EAST)             31056997   
1           Albania   EASTERN EUROPE                          3581655   
2           Algeria   NORTHERN AFRICA                        32930091   
3    American Samoa   OCEANIA                                   57794   
4           Andorra   WESTERN EUROPE                            71201   
..               ...                                  ...         ...   
222       West Bank   NEAR EAST                               2460492   
223  Western Sahara   NORTHERN AFRICA                          273008   
224           Yemen   NEAR EAST                              21456188   
225          Zambia   SUB-SAHARAN AFRICA                     11502010   
226        Zimbabwe   SUB-SAHARAN AFRICA                     12236805   

     Area (sq. mi.) Pop. Density (per sq. mi.) Coastline (coast/area ratio) 