### Upload files to AWS S3

In [None]:
import boto3

s3 = boto3.client('s3')

bucket_name= 'mypracawsbucketsc2'
file_path ='yellow_tripdata_2015-01.csv'
s3_key= 'raw/yellow_tripdata_2015-01.csv'

s3.upload_file(file_path, bucket_name, s3_key)
print("Upload completed.")

### Create connection to AWS S3

In [None]:
import os
import findspark

# Ustawienie paczek potrzebnych do obsługi S3 (muszą być przed findspark.init())
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages org.apache.hadoop:hadoop-aws:3.3.1,"
    "com.amazonaws:aws-java-sdk-bundle:1.11.375,"
    "org.postgresql:postgresql:42.2.27 pyspark-shell"
)

findspark.init()

from pyspark.sql import SparkSession

def create_spark_session(app_name="S3 CSV Reader"):
    spark_conf = {
        "spark.jars.packages": "org.apache.hadoop:hadoop-aws:3.3.1,"
                              "com.amazonaws:aws-java-sdk-bundle:1.11.375",
        "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
        "spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
    }

    builder = SparkSession.builder.appName(app_name)
    for k, v in spark_conf.items():
        builder = builder.config(k, v)
    
    spark = builder.getOrCreate()
    return spark

spark = create_spark_session()

### Read CSV file

In [None]:
df = spark.read.csv("s3a://mypracawsbucketsc2/raw/yellow_tripdata_2015-01.csv", header=True, inferSchema=True)


### Transform

In [None]:
from pyspark.sql.functions import col, to_timestamp

df_clean = df.withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime"))) \
            .withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime"))) \
            .dropna()

### Agregate some data

In [None]:
agg_df = df_clean.groupBy("VendorID").avg("total_amount")

### Save as parquet to S3


In [None]:
agg_df.write.mode("overwrite").parquet("s3a://mypracawsbucketsc2/processed/avg_amount_per_vendor.parquet")