In [1]:
import requests
from config import API_KEY
import json
from pyspark.sql import SparkSession as ss


In [4]:
data = [1,2,3,4,5]
spark = ss.builder.master("local[1]") \
                    .appName('HelloSpark') \
                    .getOrCreate()

In [17]:
spark

In [16]:
import platform
print(platform.java_ver())

('', '', ('', '', ''), ('', '', ''))


In [8]:
sc = spark.sparkContext
rdd = sc.parallelize([1,2,3,4,5], 2)
rdd.collect()

[1, 2, 3, 4, 5]

Read in csv files

In [90]:
df = spark.read.csv("../../data/datasets/ds_salaries.csv", header=True)
df.show()

+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|work_year|experience_level|employment_type|           job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---------+----------------+---------------+--------------------+------+---------------+-------------+------------------+------------+----------------+------------+
|     2023|              SE|             FT|Principal Data Sc...| 80000|            EUR|        85847|                ES|         100|              ES|           L|
|     2023|              MI|             CT|         ML Engineer| 30000|            USD|        30000|                US|         100|              US|           S|
|     2023|              MI|             CT|         ML Engineer| 25500|            USD|        25500|                US|         100|              US|           S|
|     2023

In [91]:
# See Columns and Types
df.printSchema()
df.columns

# See Data Vertically
df.show(1, vertical=True)

root
 |-- work_year: string (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: string (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: string (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)

-RECORD 0----------------------------------
 work_year          | 2023                 
 experience_level   | SE                   
 employment_type    | FT                   
 job_title          | Principal Data Sc... 
 salary             | 80000                
 salary_currency    | EUR                  
 salary_in_usd      | 85847                
 employee_residence | ES                   
 remote_ratio       | 100                  
 company_location   | ES                   
 company_size    

In [59]:

# DF Length
df.count()

3755

Select DF column, Get distinct values

In [62]:
df.select('job_title').distinct().show()

+--------------------+
|           job_title|
+--------------------+
|3D Computer Visio...|
|  Lead Data Engineer|
|        Data Modeler|
| Data Scientist Lead|
|Principal Data Ar...|
|Head of Machine L...|
|Machine Learning ...|
|Data Analytics Sp...|
|     Data Specialist|
|Data Operations E...|
|Deep Learning Res...|
| Data Analytics Lead|
|  Power BI Developer|
|Machine Learning ...|
|   Lead Data Analyst|
|        BI Developer|
|Staff Data Scientist|
|       ETL Developer|
|           Data Lead|
|Product Data Scie...|
+--------------------+
only showing top 20 rows



In [92]:
from pyspark.sql.functions import col

# Simply select the column
df.select(col('salary').cast("int")).describe().show()

# Mutate the dataframe
df.select('salary').describe().show()
df = df.withColumn("salary", col('salary').cast("int"))
df.select('salary').describe().show()

+-------+------------------+
|summary|            salary|
+-------+------------------+
|  count|              3755|
|   mean|190695.57177097205|
| stddev| 671676.5005079063|
|    min|              6000|
|    max|          30400000|
+-------+------------------+

+-------+------------------+
|summary|            salary|
+-------+------------------+
|  count|              3755|
|   mean|190695.57177097205|
| stddev| 671676.5005079063|
|    min|             10000|
|    max|             99750|
+-------+------------------+

+-------+------------------+
|summary|            salary|
+-------+------------------+
|  count|              3755|
|   mean|190695.57177097205|
| stddev| 671676.5005079063|
|    min|              6000|
|    max|          30400000|
+-------+------------------+



Change column type, introduce 'year' function / type

In [159]:
from pyspark.sql.types import DateType
from pyspark.sql.functions import month, year

# Change multiple columns
# df.printSchema()
# df.show()
int_cols = ['salary', 'salary_in_usd', 'remote_ratio']
df_tmp = df

# df_tmp = df_tmp.select(*(col(c).cast("int") for c in int_cols))
for c in int_cols: 
    df_tmp = df_tmp.withColumn(c, col(c).cast('int'))


df_tmp = df_tmp.withColumns({'salary'       : col('salary').cast("int"),
                             'salary_in_usd': col('salary_in_usd').cast("int"),
                             'remote_ratio' : col('remote_ratio').cast("int"),
                             'work_year'    : year(col('work_year'))})


df_tmp.printSchema()


root
 |-- work_year: integer (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: integer (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: integer (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)



In [74]:
df.select('job_title', 'salary').describe().show()

+-------+--------------------+------------------+
|summary|           job_title|            salary|
+-------+--------------------+------------------+
|  count|                3755|              3755|
|   mean|                null|190695.57177097205|
| stddev|                null| 671676.5005079063|
|    min|3D Computer Visio...|             10000|
|    max|Staff Data Scientist|             99750|
+-------+--------------------+------------------+



SQL Queries

In [175]:
from pyspark.sql import functions as F

# df.show()
# Add Boolean Column
df2 = df.select('experience_level', 
          'job_title',
          'company_location',
          F.when(df.salary > 300000, 1).otherwise(0).alias('Big_Sal'))\

# Select column by condition
df2[df2['Big_Sal'] == 1].show()


+----------------+--------------------+----------------+-------+
|experience_level|           job_title|company_location|Big_Sal|
+----------------+--------------------+----------------+-------+
|              SE|Computer Vision E...|              US|      1|
|              MI|Machine Learning ...|              IN|      1|
|              SE|   Applied Scientist|              US|      1|
|              MI|      Data Scientist|              HK|      1|
|              SE|Machine Learning ...|              US|      1|
|              SE|Machine Learning ...|              US|      1|
|              MI|Applied Data Scie...|              IN|      1|
|              SE|   Applied Scientist|              US|      1|
|              EN|       Data Engineer|              IN|      1|
|              EX|        Head of Data|              US|      1|
|              SE|Machine Learning ...|              US|      1|
|              EX|Director of Data ...|              US|      1|
|              SE|      D

Data Selection

In [188]:
# df.select("job_title", df.job_title.startswith("Data").alias("Data")).show()
# df[df.job_title.startswith("Data") == True].show()
df[df.job_title.endswith("Scientist") == True]


df[df.job_title.isin(["Data Scientist", "Data Analyst"]) == True].show()

+---------+----------------+---------------+--------------+------+---------------+-------------+------------------+------------+----------------+------------+
|work_year|experience_level|employment_type|     job_title|salary|salary_currency|salary_in_usd|employee_residence|remote_ratio|company_location|company_size|
+---------+----------------+---------------+--------------+------+---------------+-------------+------------------+------------+----------------+------------+
|     2023|              SE|             FT|Data Scientist|175000|            USD|       175000|                CA|         100|              CA|           M|
|     2023|              SE|             FT|Data Scientist|120000|            USD|       120000|                CA|         100|              CA|           M|
|     2023|              SE|             FT|Data Scientist|219000|            USD|       219000|                CA|           0|              CA|           M|
|     2023|              SE|             FT|Da

Group by

In [196]:
df.groupBy("job_title")\
    .count()\
    .sort("count", ascending=False)\
    .show()


+--------------------+-----+
|           job_title|count|
+--------------------+-----+
|       Data Engineer| 1040|
|      Data Scientist|  840|
|        Data Analyst|  612|
|Machine Learning ...|  289|
|  Analytics Engineer|  103|
|      Data Architect|  101|
|  Research Scientist|   82|
|   Applied Scientist|   58|
|Data Science Manager|   58|
|   Research Engineer|   37|
|         ML Engineer|   34|
|        Data Manager|   29|
|Machine Learning ...|   26|
|Data Science Cons...|   24|
|Data Analytics Ma...|   22|
|Computer Vision E...|   18|
|        AI Scientist|   16|
|Business Data Ana...|   15|
|     BI Data Analyst|   15|
|     Data Specialist|   14|
+--------------------+-----+
only showing top 20 rows

+--------------------+------------------+
|           job_title|       avg(salary)|
+--------------------+------------------+
|3D Computer Visio...|          120000.0|
|  Lead Data Engineer|140333.33333333334|
|        Data Modeler|          118900.0|
| Data Scientist Lead|    

In [201]:

df.groupBy("job_title")\
    .avg('salary')\
    .sort("avg(salary)", ascending=False)\
    .show()


+--------------------+------------------+
|           job_title|       avg(salary)|
+--------------------+------------------+
|Head of Machine L...|         6000000.0|
|Principal Data Ar...|         3000000.0|
|Lead Machine Lear...|2548666.6666666665|
| Lead Data Scientist| 928485.3333333334|
| Data Analytics Lead|          922500.0|
|     BI Data Analyst|          836644.8|
|Head of Data Science| 703729.4444444445|
|   Lead Data Analyst|          655000.0|
|         ML Engineer| 609997.9117647059|
|Product Data Analyst|          412000.0|
|  Power BI Developer|          400000.0|
|Data Science Manager| 379390.4137931034|
|Data Science Tech...|          375000.0|
|   Big Data Engineer| 365909.0909090909|
|Applied Machine L...| 306233.3333333333|
|Applied Data Scie...|          283200.0|
|        AI Scientist|          275312.5|
|Machine Learning ...|          260750.0|
|Business Data Ana...|          256200.0|
|Cloud Data Architect|          250000.0|
+--------------------+------------

Multiple Aggregations and HAVING clause

In [209]:
from pyspark.sql.functions import sum,avg,max,count

df.groupBy("job_title")\
    .agg(count("*").alias("Num_Jobs"), avg('salary').alias("AVG_Sal"))\
    .where(col("Num_Jobs") > 10)\
    .sort("AVG_Sal", ascending=False)\
    .show()



+--------------------+--------+------------------+
|           job_title|Num_Jobs|           AVG_Sal|
+--------------------+--------+------------------+
|     BI Data Analyst|      15|          836644.8|
|         ML Engineer|      34| 609997.9117647059|
|Data Science Manager|      58| 379390.4137931034|
|   Big Data Engineer|      11| 365909.0909090909|
|Applied Machine L...|      12| 306233.3333333333|
|        AI Scientist|      16|          275312.5|
|Business Data Ana...|      15|          256200.0|
|      Data Scientist|     840|239073.47619047618|
|Computer Vision E...|      18|224966.66666666666|
|Director of Data ...|      11|198227.27272727274|
|   Applied Scientist|      58| 190264.4827586207|
|Machine Learning ...|     289|182216.03460207614|
|Machine Learning ...|      26|163155.76923076922|
|   Research Engineer|      37| 162752.8108108108|
|      Data Architect|     101|161713.77227722772|
|  Research Scientist|      82|160768.89024390245|
|       Data Engineer|    1040|

Same Query in SQL

In [220]:
# Create Temporary table in PySpark
df.createOrReplaceTempView("Jobs")

sql_q = \
    "SELECT job_title, "\
        "COUNT(*) AS Num_Jobs, "\
        "INT(AVG(salary)) AS AVG_Sal "\
    "FROM Jobs "\
    "GROUP BY job_title "\
    "HAVING Num_Jobs > 10 "\
    "ORDER BY AVG_Sal DESC"

spark.sql(sql_q).show()


+--------------------+--------+-------+
|           job_title|Num_Jobs|AVG_Sal|
+--------------------+--------+-------+
|     BI Data Analyst|      15| 836644|
|         ML Engineer|      34| 609997|
|Data Science Manager|      58| 379390|
|   Big Data Engineer|      11| 365909|
|Applied Machine L...|      12| 306233|
|        AI Scientist|      16| 275312|
|Business Data Ana...|      15| 256200|
|      Data Scientist|     840| 239073|
|Computer Vision E...|      18| 224966|
|Director of Data ...|      11| 198227|
|   Applied Scientist|      58| 190264|
|Machine Learning ...|     289| 182216|
|Machine Learning ...|      26| 163155|
|   Research Engineer|      37| 162752|
|      Data Architect|     101| 161713|
|  Research Scientist|      82| 160768|
|       Data Engineer|    1040| 156574|
|  Analytics Engineer|     103| 151352|
|Data Science Cons...|      24| 141937|
|Data Analytics Ma...|      22| 141879|
+--------------------+--------+-------+
only showing top 20 rows



Create dataframe manually

In [48]:
from datetime import datetime, date
import pandas as pd

my_df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')

pd_df = pd.DataFrame({'a': [1,2,3], 'b': [1,2,3]})
my_pd_df = spark.createDataFrame(pd_df)
[my_df, my_pd_df]


[DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp],
 DataFrame[a: bigint, b: bigint]]

In [14]:
spark2 = ss.newSession

In [30]:
df = spark.createDataFrame(
    [("Scala", 25000), ("Spark", 35000), ("PHP", 21000)])


In [None]:
df# df.show()

: 

In [7]:
API_KEY

'ee6cb9a9fcd54883ab7200357230507'

In [8]:
url = "http://api.weatherapi.com/v1/current.json"

querystring = {"q": "Canada", "aqi": "yes"}

headers = {
	"key": API_KEY,
}

response = requests.get(url, headers=headers, params=querystring)
response.json()
print(json.dumps(response.json(), indent=2))

{
  "location": {
    "name": "Ottawa",
    "region": "Ontario",
    "country": "Canada",
    "lat": 45.42,
    "lon": -75.7,
    "tz_id": "America/Toronto",
    "localtime_epoch": 1694016710,
    "localtime": "2023-09-06 12:11"
  },
  "current": {
    "last_updated_epoch": 1694016000,
    "last_updated": "2023-09-06 12:00",
    "temp_c": 29.0,
    "temp_f": 84.2,
    "is_day": 1,
    "condition": {
      "text": "Partly cloudy",
      "icon": "//cdn.weatherapi.com/weather/64x64/day/116.png",
      "code": 1003
    },
    "wind_mph": 3.8,
    "wind_kph": 6.1,
    "wind_degree": 210,
    "wind_dir": "SSW",
    "pressure_mb": 1011.0,
    "pressure_in": 29.86,
    "precip_mm": 0.0,
    "precip_in": 0.0,
    "humidity": 66,
    "cloud": 25,
    "feelslike_c": 31.2,
    "feelslike_f": 88.2,
    "vis_km": 24.0,
    "vis_miles": 14.0,
    "uv": 8.0,
    "gust_mph": 6.5,
    "gust_kph": 10.4,
    "air_quality": {
      "co": 243.7,
      "no2": 0.5,
      "o3": 100.1,
      "so2": 0.6,
      "