In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession


# Removing hard coded password - using os module & open to import them from creds.txt file
import os
import sys

try:
    creds_file = (
        (open(f"/home/{os.getenv('USER')}/creds.txt", "r")).read().strip().split(",")
    )
    accesskey, secretkey = creds_file[0], creds_file[1]
except:
    print("File not found, you can't access minio")
    accesskey, secretkey = "", ""

conf = SparkConf()
conf.set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.3")
conf.set(
    "spark.hadoop.fs.s3a.aws.credentials.provider",
    "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
)

conf.set("spark.hadoop.fs.s3a.access.key", accesskey)
conf.set("spark.hadoop.fs.s3a.secret.key", secretkey)
# Configure these settings
# https://medium.com/@dineshvarma.guduru/reading-and-writing-data-from-to-minio-using-spark-8371aefa96d2
conf.set("spark.hadoop.fs.s3a.path.style.access", "true")
conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
# https://github.com/minio/training/blob/main/spark/taxi-data-writes.py
# https://spot.io/blog/improve-apache-spark-performance-with-the-s3-magic-committer/
conf.set("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
conf.set("spark.hadoop.fs.s3a.committer.name", "magic")
# Internal IP for S3 cluster proxy
conf.set("spark.hadoop.fs.s3a.endpoint", "http://system54.rice.iit.edu")
# Send jobs to the Spark Cluster
conf.setMaster("spark://sm.service.consul:7077")
# Set driver and executor memory
conf.set("spark.driver.memory", "4g")
conf.set("spark.executor.memory", "4g")

spark = (
    SparkSession.builder.appName("THY")
    .config("spark.driver.host", "spark-edge.service.consul")
    .config(conf=conf)
    .getOrCreate()
)

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *


# Logs
sc = spark.sparkContext
sc.setLogLevel("INFO")

# Reading From MinIO
df = spark.read.csv("s3a://itmd521/50.txt")

splitDF = (
    df.withColumn("WeatherStation", df["_c0"].substr(5, 6))
    .withColumn("WBAN", df["_c0"].substr(11, 5))
    .withColumn("ObservationDate", to_date(df["_c0"].substr(16, 8), "yyyyMMdd"))
    .withColumn("ObservationHour", df["_c0"].substr(24, 4).cast(IntegerType()))
    .withColumn("Latitude", df["_c0"].substr(29, 6).cast("float") / 1000)
    .withColumn("Longitude", df["_c0"].substr(35, 7).cast("float") / 1000)
    .withColumn("Elevation", df["_c0"].substr(47, 5).cast(IntegerType()))
    .withColumn("WindDirection", df["_c0"].substr(61, 3).cast(IntegerType()))
    .withColumn("WDQualityCode", df["_c0"].substr(64, 1).cast(IntegerType()))
    .withColumn("SkyCeilingHeight", df["_c0"].substr(71, 5).cast(IntegerType()))
    .withColumn("SCQualityCode", df["_c0"].substr(76, 1).cast(IntegerType()))
    .withColumn("VisibilityDistance", df["_c0"].substr(79, 6).cast(IntegerType()))
    .withColumn("VDQualityCode", df["_c0"].substr(86, 1).cast(IntegerType()))
    .withColumn("AirTemperature", df["_c0"].substr(88, 5).cast("float") / 10)
    .withColumn("ATQualityCode", df["_c0"].substr(93, 1).cast(IntegerType()))
    .withColumn("DewPoint", df["_c0"].substr(94, 5).cast("float"))
    .withColumn("DPQualityCode", df["_c0"].substr(99, 1).cast(IntegerType()))
    .withColumn("AtmosphericPressure", df["_c0"].substr(100, 5).cast("float") / 10)
    .withColumn("APQualityCode", df["_c0"].substr(105, 1).cast(IntegerType()))
    .drop("_c0")
)

splitDF.printSchema()
splitDF.show(10)

In [None]:
splitDF.write.mode("overwrite").option("header", "true").csv(
    "s3a://tyu17/50.csv"
)

In [None]:
%%capture thyapp
# average temperature and standard deviation per month, per year.
avg_df = (
    splitDF.select(
        month(col("ObservationDate")).alias("Month"),
        year(col("ObservationDate")).alias("Year"),
        col("AirTemperature").alias("Temperature"),
    )
    .groupBy("Month", "Year")
    .agg(avg("Temperature"), stddev("Temperature"))
    .orderBy("Year", "Month")
)

avg_df.show(10)


In [None]:
# stop your session
spark.stop()