In [None]:
!pip install pyspark
!pip install cassandra-driver

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
from pyspark.sql.functions import col, isnan, when, count, avg, first, regexp_replace

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Survey
survey_schema = StructType([
    StructField("Id", IntegerType(), True),
    StructField("Age", StringType(), True),
    StructField("Accessibility", StringType(), True),
    StructField("EdLevel", StringType(), True),
    StructField("Employment", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("MentalHealth", StringType(), True),
    StructField("MainBranch", StringType(), True),
    StructField("YearsCode", IntegerType(), True),
    StructField("YearsCodePro", IntegerType(), True),
    StructField("Country", StringType(), True),
    StructField("PreviousSalary", StringType(), True),
    StructField("HaveWorkedWith", StringType(), True),
    StructField("ComputerSkills", IntegerType(), True),
    StructField("Employed", IntegerType(), True)
])
df = spark.read.csv("./raw_data.csv", header=True, schema=survey_schema)

# Country Codes, Courtesy of https://raw.githubusercontent.com/lukes/ISO-3166-Countries-with-Regional-Codes/master/all/all.csv
country_codes_schema = StructType([
])

cdf = spark.read.csv("./country_codes.csv", header=True, inferSchema=True)
cdf = cdf.select("name", "country-code").withColumnsRenamed({"name": "Country", "country-code": "CountryCode"})
cdf.show(truncate=True)

# Output dataframe
output = spark.createDataFrame([], StructType([
    StructField("Country", StringType(), True)
]))

cols = list(set(df.columns) - {"Country"})
df = df.select(*cols, regexp_replace("Country", "Republic of Korea", "South Korea").alias("Country")) \
    .select(*cols, regexp_replace("Country", "[\.,\()\-_']", "").alias("Country"))
tdf = df.groupBy("Country").count().select("Country")
cdf = cdf.select("CountryCode", regexp_replace("Country", "[\.,\()\-_']", "").alias("Country"))

patch = [
    ("Bolivia", 68),
    ("Cape Verde", 132),
    ("Congo Republic of the", 180),
    ("Czech Republic", 203),
    ("Democratic Republic of the Congo", 180),
    ("Hong Kong SAR", 344),
    ("Libyan Arab Jamahiriya", 434),
    ("Palestine", 275),
    ("South Korea", 410),
    ("Republic of Moldova", 498),
    ("Taiwan", 158),
    ("United Republic of Tanzania", 834),
    ("The former Yugoslav Republic of Macedonia", 807)
]
patch_schema = StructType([
    StructField("Country", StringType(), True),
    StructField("CountryCode", IntegerType(), True)
])
patch_df = spark.createDataFrame(data=patch, schema=patch_schema)

cdf = cdf.join(patch_df, on=["Country", "CountryCode"], how="outer")

output = output.join(tdf, on="Country", how="outer")
output = output.join(cdf, on="Country", how="inner")

selected_columns=df.select(df.columns[6:])
# Show the DataFrame
#selected_columns.show(truncate=True)

In [None]:
columns_to_average = ["Employment","YearsCode","YearsCodePro","PreviousSalary","ComputerSkills","Employed"]  
result_df = df.groupBy("Country").agg(*(avg(col(c)).alias(f"Avg_{c}") for c in columns_to_average))

result_df.show(truncate=True)
output = output.join(result_df, on="Country", how="inner")

In [None]:
result_df = df.groupBy("Country", "Gender").agg(count("*").alias("Count"))
result_pivoted = result_df.groupBy("Country").pivot("Gender", ["Man", "Woman"]).agg(first("Count").alias("Count")).fillna(0)

result_pivoted.show(truncate=True)
output = output.join(result_pivoted, on="Country", how="inner")

In [None]:
result_df = df.groupBy("Country", "Age").agg(count("*").alias("Count"))
result_pivoted = result_df.groupBy("Country").pivot("Age", ["<35", ">35"]).agg(first("Count").alias("Count")).fillna(0)

result_pivoted.show(truncate=True)
output = output.join(result_pivoted, on="Country", how="inner")

In [None]:
result_df = df.groupBy("Country", "MainBranch").agg(count("*").alias("Count"))
result_pivoted = result_df.groupBy("Country").pivot("MainBranch", ["Dev", "NotDev"]).agg(first("Count").alias("Count")).fillna(0)

result_pivoted.show(truncate=True)
output = output.join(result_pivoted, on="Country", how="inner")

In [None]:
result_df = df.groupBy("Country", "MentalHealth").agg(count("*").alias("Count"))

result_pivoted = result_df.groupBy("Country").pivot("MentalHealth", ["Yes", "No"]).agg(first("Count").alias("Count"))
result_pivoted = result_pivoted.fillna(0).withColumnsRenamed({"Yes": "MentalHealthIssuesYes", "No": "MentalHealthIssuesNo"})

result_pivoted.show(truncate=False)
output = output.join(result_pivoted, on="Country", how="inner")

In [None]:
education_levels = ['Master', 'Undergraduate', 'Other', 'PhD', 'NoHigherEd']
df = df.withColumn("EdLevel", when(col("EdLevel").isin(education_levels), col("EdLevel")).otherwise("Other"))

result_df = df.groupBy("Country", "EdLevel").agg(count("*").alias("Count"))
result_pivoted = result_df.groupBy("Country").pivot("EdLevel").agg(first("Count").alias("Count"))
result_pivoted = result_pivoted.fillna(0)

result_pivoted.show(truncate=False)
output = output.join(result_pivoted, on="Country", how="inner")

In [None]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

with open('credentials.json') as f:
    creds = json.load(f)

auth_provider = PlainTextAuthProvider(username=creds['user'], password=creds['pass'])
cluster = Cluster(['demo-dc1-all-pods-service.k8ssandra-operator.svc.cluster.local'], auth_provider=auth_provider)
session = cluster.connect()

session.set_keyspace('prod')
rows = output.collect()
batch = "BEGIN BATCH\n"
for row in rows:
    batch += "INSERT INTO employment (country, code, avgEmployment, avgYearsCode, avgYearsCodePro, avgPreviousSalary, avgComputerSkills, men, women, below35, above35, dev, notDev, mentalHealthIssuesYes, mentalHealthIssuesNo, phd, master, undergraduate, noHigherEd, otherEd) "
    batch += "VALUES ('{}', {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {});\n".format(
        row.Country,
        row.CountryCode,
        row.Avg_Employment,
        row.Avg_YearsCode,
        row.Avg_YearsCodePro,
        row.Avg_PreviousSalary,
        row.Avg_ComputerSkills,
        row.Man,
        row.Woman,
        row["<35"],
        row[">35"],
        row.Dev,
        row.NotDev,
        row.MentalHealthIssuesYes,
        row.MentalHealthIssuesNo,
        row.PhD,
        row.Master,
        row.Undergraduate,
        row.NoHigherEd,
        row.Other
    )
batch += "APPLY BATCH"
session.execute(batch)