In [7]:
%%sql
CREATE DATABASE IF NOT EXISTS climate;

24/12/22 09:12:05 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [8]:
%%sql
CREATE TABLE IF NOT EXISTS climate.weather (
    datetime              timestamp,
    temp                  double,
    lat                   double,
    long                  double,
    cloud_coverage        string,
    precip                double,
    wind_speed            double
)
USING iceberg
PARTITIONED BY (days(datetime))

In [22]:
%%sql
select * from climate.weather

datetime,temp,lat,long,cloud_coverage,precip,wind_speed
2023-08-16 00:00:00,76.2,40.951908,-74.075272,Partially sunny,0.0,3.5
2023-08-17 00:00:00,82.5,40.951908,-74.075272,Sunny,0.0,1.2
2023-08-18 00:00:00,70.9,40.951908,-74.075272,Cloudy,0.5,5.2


In [23]:
#insert data
from datetime import datetime

schema = spark.table("climate.weather").schema

data = [
    (datetime(2023,8,16), 76.2, 40.951908, -74.075272, "Partially sunny", 0.0, 3.5),
    (datetime(2023,8,17), 82.5, 40.951908, -74.075272, "Sunny", 0.0, 1.2),
    (datetime(2023,8,18), 70.9, 40.951908, -74.075272, "Cloudy", .5, 5.2)
  ]

df = spark.createDataFrame(data, schema)
df.writeTo("climate.weather").append()

In [24]:
import boto3
from botocore.config import Config
import pyarrow.parquet as pq
from io import BytesIO
from pyspark.sql import SparkSession

# Initialize the S3 client
s3 = boto3.client(
    's3',
    endpoint_url='http://minio:9000',  # Replace with your MinIO server URL
    config=Config(signature_version='s3v4')
)

# Initialize the Spark session
spark = SparkSession.builder \
    .appName("Read Parquet from S3") \
    .getOrCreate()

# Bucket name and folder prefix (path)
bucket_name = 'warehouse'
folder_prefix = 'climate/weather/data/'

# List all the objects in the 'data' folder
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_prefix)

# List to store Spark DataFrames
df_list = []

# Loop through all the objects and read each Parquet file
for obj in response.get('Contents', []):
    object_key = obj['Key']
    
    # Check if the object is a Parquet file (or if you want to apply other conditions)
    if object_key.endswith('.parquet'):
        # Get the Parquet file from MinIO
        response = s3.get_object(Bucket=bucket_name, Key=object_key)
        parquet_data = response['Body'].read()
        
        # Read the Parquet data using PyArrow
        table = pq.read_table(BytesIO(parquet_data))
        
        # Convert PyArrow Table to Spark DataFrame
        df_spark = spark.createDataFrame(table.to_pandas())  # Convert pandas DataFrame to Spark DataFrame
        
        # Append the Spark DataFrame to the list
        df_list.append(df_spark)

# Combine all DataFrames into one Spark DataFrame if needed
full_df_spark = df_list[0]
for df in df_list[1:]:
    full_df_spark = full_df_spark.union(df)

# Show the resulting DataFrame
full_df_spark.show()


24/12/22 09:51:17 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-------------------+----+---------+----------+---------------+------+----------+
|           datetime|temp|      lat|      long| cloud_coverage|precip|wind_speed|
+-------------------+----+---------+----------+---------------+------+----------+
|2023-08-16 00:00:00|76.2|40.951908|-74.075272|Partially sunny|   0.0|       3.5|
|2023-08-16 00:00:00|76.2|40.951908|-74.075272|Partially sunny|   0.0|       3.5|
|2023-08-16 00:00:00|76.2|40.951908|-74.075272|Partially sunny|   0.0|       3.5|
|2023-08-16 00:00:00|76.2|40.951908|-74.075272|Partially sunny|   0.0|       3.5|
|2023-08-17 00:00:00|82.5|40.951908|-74.075272|          Sunny|   0.0|       1.2|
|2023-08-17 00:00:00|82.5|40.951908|-74.075272|          Sunny|   0.0|       1.2|
|2023-08-17 00:00:00|82.5|40.951908|-74.075272|          Sunny|   0.0|       1.2|
|2023-08-17 00:00:00|82.5|40.951908|-74.075272|          Sunny|   0.0|       1.2|
|2023-08-18 00:00:00|70.9|40.951908|-74.075272|         Cloudy|   0.5|       5.2|
|2023-08-18 00:0