In [2]:
import os, zipfile, json, pyspark, requests
from io import BytesIO
from pyspark.sql import SparkSession
import pandas as pd


# Define dataset, spark constants, and kaggle creds path
APPNAME = 'Roberts_Systool_Project'
MASTER = 'local[3]' # specify Spark to use 3 cores since there are no worker nodes
JDBC_JAR_PATH = f"/home/jrobe/.local/lib/python3.10/site-packages/pyspark/pyspark_jars/postgresql-42.7.4.jar"
JDBC_URL = "jdbc:postgresql://localhost:5432/postgres"
DB_PROPERTIES = {
    "user": "postgres",
    "password": "postgres_pw",
    "driver": "org.postgresql.Driver"
}
KAGGLE_JSON_PATH = "~/.kaggle/kaggle.json"
KAGGLE__DATASET_URL = f'https://www.kaggle.com/api/v1/datasets/download/cnrieiit/mqttset'
AUGMENTED_DATA_FILES = ['train70_augmented.csv', 'test30_augmented.csv']

# Define the path to the kaggle.json file
kaggle_json_path = os.path.expanduser(KAGGLE_JSON_PATH)

# Load the Kaggle credentials
with open(kaggle_json_path, 'r') as f:
    kaggle_creds = json.load(f)

# Extract the username and key
KAGGLE_USERNAME = kaggle_creds['username']
KAGGLE_KEY = kaggle_creds['key']

# Create local host Configuration object for spark 
config = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .setAppName(APPNAME)\
    .setMaster(MASTER)\
    
# Initialize Spark session with PostgreSQL JDBC driver
spark = SparkSession.builder \
    .appName("Kaggle_Spark_PostgreSQL") \
    .config(conf=config) \
    .config("spark.jars", JDBC_JAR_PATH) \
    .getOrCreate()

# Download the dataset as a zip file in memory
response = requests.get(KAGGLE__DATASET_URL, auth=(KAGGLE_USERNAME, KAGGLE_KEY))
if response.status_code == 200:
    print("Dataset downloaded successfully!")
    # Unzip the specific files directly to memory using ZipFile and io and set overwrite to True
    dataset_zip = BytesIO(response.content)
    overwrite = True
    with zipfile.ZipFile(dataset_zip, 'r') as z:
        for file in AUGMENTED_DATA_FILES:
            if file in z.namelist():
                # Load file into memory as a Pandas DataFrame which should be far faster than writing to disk then reading
                with z.open(file) as f:
                    df = pd.read_csv(f)
                    spark_df = spark.createDataFrame(df) # convert pandas to spark
                    
                    # Show the first 5 rows to verify
                    spark_df.show(5)
                    
                    # Overwrite first file to create schema as well, then append the following files
                    if overwrite:
                        spark_df.write.jdbc(url=JDBC_URL, table="your_table_name", mode="overwrite", properties=DB_PROPERTIES)
                        overwrite = False
                    else:
                        spark_df.write.jdbc(url=JDBC_URL, table="your_table_name", mode="append", properties=DB_PROPERTIES)

else:
    print(f"Failed to download dataset: {response.status_code}")


your 131072x1 screen size is bogus. expect trouble
24/10/10 20:17:32 WARN Utils: Your hostname, Jonathan-Laptop resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/10/10 20:17:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/10/10 20:17:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Dataset downloaded successfully!
