In [1]:
import pyspark
from pyspark.sql import *
from pyspark import SparkContext, SparkConf

from pyspark.sql import *
from pyspark.sql.functions import *

import pandas as pd
import sqlite3

import matplotlib

In [2]:
conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/08 22:46:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/08 22:46:59 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
import os.path
from os import listdir
from os.path import isfile, join
import sys

import subprocess
import yaml

# Add the parent directory to the system path
parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
sys.path.append(parent_dir)

print(parent_dir)

/home/ho-yu/bsg


In [4]:
print("Current working directory set to:", os.getcwd())

# Read the config.yaml file
if os.path.exists('config.yaml'):
    with open('config.yaml', 'r') as file:
        config = yaml.safe_load(file)

if os.path.exists('../config.yaml'):
    with open('../config.yaml', 'r') as file:
        config = yaml.safe_load(file)

dir = config['fileio']['working_directory2']

print("dir: " + dir)

# Set the current working directory
os.chdir(dir)
print("Current working directory set to:", os.getcwd())

Current working directory set to: /home/ho-yu/bsg/notebooks
dir: /home/ho-yu/bsg
Current working directory set to: /home/ho-yu/bsg


In [5]:
bsg_people = spark.read.csv(dir + "/csv/people.csv", header=True, inferSchema=True) \
    .withColumnRenamed("id", "person_id")

bsg_colonies = spark.read.csv(dir + "/csv/colonies.csv", header=True) \
    .withColumnRenamed("id", "colony_id") \
    .withColumnRenamed("name", "colony_name")

bsg_actions = spark.read.csv(dir + "/csv/person_actions.csv", header=True)

bsg_employees = spark.read.csv(dir + "/csv/employees.csv", header=True) \
    .withColumnRenamed("id", "employee_id") \
    .withColumnRenamed("person_id", "employee_person_id")

bsg_empoyers = spark.read.csv(dir + "/csv/employers.csv", header=True) \
    .withColumnRenamed("id", "employer_id") \
    .withColumnRenamed("name", "employer_name")

bsg_departments = spark.read.csv(dir + "/csv/departments.csv", header=True) \
    .withColumnRenamed("id", "department_id") \
    .withColumnRenamed("name", "department_name")

bsg_data = bsg_people \
    .join(bsg_colonies, bsg_people.home_colony_id == bsg_colonies.colony_id) \
    .join(bsg_actions, bsg_people.person_id == bsg_actions.source_person_id)

bsg_data.show()
bsg_data.createOrReplaceTempView("bsg_data")
bsg_data_count = spark.sql("SELECT colony_name, COUNT(DISTINCT(person_id)) personCount FROM bsg_data GROUP BY colony_name")

bsg_data_count.show()

+---------+----------+---------+----------+---------+--------------+---------+-----------+---+-----------+-------------------+----------------+----------------+
|person_id|first_name|last_name|salutation|call_sign|home_colony_id|colony_id|colony_name| id|action_name|   action_timestamp|source_person_id|target_person_id|
+---------+----------+---------+----------+---------+--------------+---------+-----------+---+-----------+-------------------+----------------+----------------+
|        1|       Lee|    Adama|       Sir|   Apollo|             4|        4|    Caprica| 85|    forgive|2025-02-08 22:46:29|               1|               5|
|        1|       Lee|    Adama|       Sir|   Apollo|             4|        4|    Caprica| 77|     betray|2025-02-08 22:46:29|               1|               7|
|        1|       Lee|    Adama|       Sir|   Apollo|             4|        4|    Caprica| 75|       save|2025-02-08 22:46:29|               1|               9|
|        1|       Lee|    Adama|  

In [61]:
all_employees = bsg_employees \
    .join(bsg_people, bsg_employees.employee_person_id == bsg_people.person_id) \
    .join(bsg_departments, bsg_employees.dept_id == bsg_departments.department_id) \
    .join(bsg_colonies, bsg_people.home_colony_id == bsg_colonies.colony_id)

all_employees.select(["first_name", "last_name", "department_name", "salary", "colony_name"]).show()

# average salary per department
all_employees.select(["department_name", "salary"]) \
    .groupBy("department_name") \
    .agg(avg("salary").alias("avg_salary")) \
    .sort("avg_salary", ascending=False) \
    .show()

df_cols = ["first_name", "last_name", "salary", "position", "department_name", "colony_name"]
all_employees_df = all_employees.select(df_cols).toDF(*df_cols)

print(all_employees_df.show())

+----------+---------+--------------------+--------+-----------+
|first_name|last_name|     department_name|  salary|colony_name|
+----------+---------+--------------------+--------+-----------+
|       Lee|    Adama| Tactical Operations|120000.0|    Caprica|
|   William|    Adama| Tactical Operations|150000.0|    Caprica|
|      Kara|   Thrace|   Flight Operations| 90000.0|    Caprica|
|     Gaius|   Baltar|Artificial Intell...|100000.0|    Aerilon|
|     Laura|   Roslin|    Executive Branch|200000.0|    Caprica|
|      Saul|     Tigh| Tactical Operations|110000.0|      Earth|
|     Billy|  Keikeya|    Executive Branch| 85000.0|      Picon|
|     Galen|    Tyrol|         Engineering| 95000.0|      Earth|
| Anastasia|   Dualla| Tactical Operations| 87000.0| Sagittaron|
|      Karl|  Agathon|   Flight Operations| 92000.0|    Caprica|
| Samuel T.|   Anders|   Military Strategy|102000.0|      Earth|
+----------+---------+--------------------+--------+-----------+

+--------------------+--

In [63]:
def avg_salary_mapper(row):
    # print(row)
    return (row.colony_name, (row.salary, 1))


all_employees_df.rdd \
    .map(lambda s: avg_salary_mapper(s)) \
    .reduceByKey(lambda a, b: (float(a[0]) + float(b[0]), float(a[1]) + float(b[1]))) \
    .mapValues(lambda s: float(s[0]) / float(s[1])) \
    .sortBy(lambda s: s[1], ascending=True) \
    .collect()

[('Picon', 85000.0),
 ('Sagittaron', 87000.0),
 ('Aerilon', 100000.0),
 ('Earth', 102333.33333333333),
 ('Caprica', 130400.0)]

In [None]:
all_employees_df.rdd \
    .map(lambda s: (s.first_name + " " + s.last_name, s.salary, s.position, s.department_name, s.colony_name)) \
    .filter(lambda s: s[4] == "Caprica") \
    .sortBy(lambda s: s[1], ascending=True) \
    .collect()

[('Samuel T. Anders',
  '102000.0',
  'Resistance Leader',
  'Military Strategy',
  'Earth'),
 ('Saul Tigh',
  '110000.0',
  'Executive Officer',
  'Tactical Operations',
  'Earth'),
 ('Lee Adama', '120000.0', 'Commander', 'Tactical Operations', 'Caprica'),
 ('William Adama',
  '150000.0',
  'Fleet Admiral',
  'Tactical Operations',
  'Caprica'),
 ('Laura Roslin', '200000.0', 'President', 'Executive Branch', 'Caprica')]