In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import avg, col, when, row_number
from pyspark.sql.window import Window

In [None]:
#docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' spark_spark_1

with open("remote_ip.txt") as f:
    scheduler_ip = f.read()

## GCP
Currently uses local pyspark to connect to my existing GCP resources. Intention is to use local containers instead of submitting jobs to dataproc.

In [2]:
spark = (
    SparkSession.builder.appName("gcp")
    #.config("spark.master", f"spark://{scheduler_ip}:7077")
    .config("spark.jars", "spark-3.3-bigquery-0.32.2.jar,gcs-connector-hadoop3-latest.jar")
    .config("spark.master", "local[*]")
    .getOrCreate()
)

spark._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable", "true")
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile","methodical-aura-373904-585595bbc316.json")

In [3]:
df = spark.read.csv("gs://mark-dbt-learn-bucket/Worlds_Best_Employers.csv", header=True)
df.show()

+----+-------------------+--------------------+-----------------+---------+
|RANK|               NAME|          INDUSTRIES|COUNTRY/TERRITORY|EMPLOYEES|
+----+-------------------+--------------------+-----------------+---------+
|  1.|Samsung Electronics|Semiconductors, E...|       South Kore|  266,673|
|  2.|          Microsoft|IT, Internet, Sof...|    United States|  221,000|
|  3.|                IBM|Semiconductors, E...|    United States|  250,000|
|  4.|           Alphabet|IT, Internet, Sof...|    United States|  156,500|
|  5.|              Apple|Semiconductors, E...|    United States|  154,000|
|  6.|    Delta Air Lines|Transportation an...|    United States|   80,000|
|  7.|   Costco Wholesale|Retail and Wholesale|    United States|  288,000|
|  8.|              Adobe|IT, Internet, Sof...|    United States|   25,988|
|  9.| Southwest Airlines|Transportation an...|    United States|   55,093|
| 10.|  Dell Technologies|Semiconductors, E...|    United States|  133,000|
| 11.|    Lo

In [4]:
bucket = "dataproc-temp-us-east1-49528533322-xcoaaowy"
spark.conf.set('temporaryGcsBucket', bucket)

In [6]:
words = spark.read.format("bigquery") \
  .option("parentProject", "methodical-aura-373904") \
  .option("dataset", "comp653") \
  .option("table", "raw_insurance_claims") \
  .load()

words.show(5)

+------------------+---+-------------+----------------+------------+----------+-----------------+---------------------+--------------+-----------+-----------+-----------------------+------------------+---------------+--------------------+-------------+------------+-------------+-------------+--------------+-----------------+---------------------+--------------+-------------+------------------+------------------------+---------------------------+---------------+---------------+---------+-----------------------+------------------+------------+--------------+-------------+---------+----------+---------+--------------+----+
|months_as_customer|age|policy_number|policy_bind_date|policy_state|policy_csl|policy_deductable|policy_annual_premium|umbrella_limit|insured_zip|insured_sex|insured_education_level|insured_occupation|insured_hobbies|insured_relationship|capital_gains|capital_loss|incident_date|incident_type|collision_type|incident_severity|authorities_contacted|incident_state|incident_

In [None]:
#Stop the Spark session
spark.stop()

## Local

Uses the spark cluster stood up via docker-compose.

In [None]:
#https://blog.det.life/pyspark-or-polars-what-should-you-use-breakdown-of-similarities-and-differences-b261a825b9d6

spark = SparkSession.builder.appName("Test").config("spark.master", f"spark://{scheduler_ip}:7077").getOrCreate()

In [None]:
data1 = [Row(id=1, age=25, salary=50000),
         Row(id=2, age=30, salary=55000),
         Row(id=3, age=35, salary=60000),
         Row(id=4, age=40, salary=65000)]
data2 = [Row(id=1, city="New York"),
         Row(id=2, city="San Francisco"),
         Row(id=3, city="Los Angeles"),
         Row(id=4, city="Chicago")]

df1_pyspark = spark.createDataFrame(data1)
df2_pyspark = spark.createDataFrame(data2)

In [8]:
# Perform operations
selected_df = df1_pyspark.select("id", "salary")
filtered_df = selected_df.filter(col("salary") > 50000)
renamed_df = filtered_df.withColumnRenamed("salary", "income")
joined_df = renamed_df.join(df2_pyspark, on="id", how="inner")
conditional_df = joined_df.withColumn("high_income", when(col("income") > 60000, 1).otherwise(0))
conditional_df.show()

+---+------+-------------+-----------+
| id|income|         city|high_income|
+---+------+-------------+-----------+
|  2| 55000|San Francisco|          0|
|  3| 60000|  Los Angeles|          0|
|  4| 65000|      Chicago|          1|
+---+------+-------------+-----------+



In [9]:
def salary_increase(salary: int) -> int:
    return salary + 5000


from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

salary_increase_udf = udf(salary_increase, IntegerType())
udf_applied_df = conditional_df.withColumn(
    "increased_income", salary_increase_udf(col("income"))
)

window_spec = Window.orderBy("id")
ranked_df = udf_applied_df.withColumn("rank", row_number().over(window_spec))

# GroupBy and aggregation
result_df = (
    ranked_df.groupBy("city")
    .agg(
        avg("income").alias("average_income"),
        avg("increased_income").alias("average_increased_income"),
    )
    .sort("average_income", ascending=False)
)

# Show the resulting DataFrame
result_df.show()

+-------------+--------------+------------------------+
|         city|average_income|average_increased_income|
+-------------+--------------+------------------------+
|      Chicago|       65000.0|                 70000.0|
|  Los Angeles|       60000.0|                 65000.0|
|San Francisco|       55000.0|                 60000.0|
+-------------+--------------+------------------------+



In [4]:
#Stop the Spark session
spark.stop()