In [69]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import pandas as pd
import os
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
ss = SparkSession \ 
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


In [79]:
schema = StructType(fields=[ 
    StructField("player_id", IntegerType()),
    StructField("session_id", StringType ()),
    StructField("event", StringType ())]
    )

In [80]:
sessions = open("sessions.csv", "w+")
df = ss.read.csv("sessions.csv", schema=schema, sep=";")
df.show()


+---------+----------+-----+
|player_id|session_id|event|
+---------+----------+-----+
+---------+----------+-----+



In [106]:
import random
import uuid

event = []
player_id = []
session_id = []

for x in range(5):
    event.append(int(random.uniform(0,2)))
    player_id.append(uuid.uuid4())
    session_id.append(uuid.uuid4())
df1 = pd.DataFrame(zip(player_id, session_id, event), columns=['player_id', 'session_id', 'event'])
df1

Unnamed: 0,player_id,session_id,event
0,f78f347c-a9f9-449c-8518-96a157d03958,a310641c-9846-4c5e-811f-0f7ec9d4e79e,0
1,e8c49748-9471-4aa1-b215-ca61b93b74c1,af9efe21-57b6-4cc6-8316-2f251be58585,0
2,645f3d8c-1c85-4e90-bb42-ea1d401b6845,e9d0bccb-d016-4dee-be1c-1541b739c47f,1
3,9a6a1678-3db0-4cd2-b3b4-29cd41e8ba87,4f57d97a-34d9-44bc-a9a9-2101710e7236,0
4,445013be-57fb-4da3-904d-dae3919068cb,3c280879-e26e-4c37-92fa-413a2a4ebafd,1


In [89]:
import avro
import avro.schema
import json
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
import fastavro 
from fastavro import writer
import copy

In [76]:
schema = {
    'doc': 'A weather reading.',
    'name': 'Weather',
    'namespace': 'test',
    'type': 'record',
    'fields': [
        {'name': 'station', 'type': 'string'},
        {'name': 'time', 'type': 'long'},
        {'name': 'temp', 'type': 'int'},
    ],
}

records = [
    {'station': '011990-99999', 'temp': 0, 'time': 1433269388},
    {'station': '011990-99999', 'temp': 22, 'time': 1433270389},
    {'station': '011990-99999', 'temp': -11, 'time': 1433273379},
    {'station': '012650-99999', 'temp': 111, 'time': 1433275478},
]

with open('weather.avro', 'wb') as out:
    writer(out, schema, records)
schema_parsed = avro.schema.parse(json.dumps(schema))

In [92]:
with open('weather.avro', 'wb') as f:
    writer = DataFileWriter(f, DatumWriter(), schema_parsed)
    writer.append({'station': '012334', 'temp': 77, 'time': 1323434})
    writer.close()

with open('weather.avro', 'rb') as f:
    reader = DataFileReader(f, DatumReader())
    metadata = copy.deepcopy(reader.meta)
    schema_from_file = json.loads(metadata['avro.schema'])
    station = [station for station in reader]
    reader.close()

print(f'Schema that we specified:\n {schema}')
print(f'Schema that we parsed:\n {schema_parsed}')
print(f'Schema from users.avro file:\n {schema_from_file}')
print(f'Stations:\n {station}')

Schema that we specified:
 StructType([StructField('player_id', IntegerType(), True), StructField('session_id', StringType(), True), StructField('event', StringType(), True)])
Schema that we parsed:
 {"type": "record", "name": "Weather", "namespace": "test", "fields": [{"type": "string", "name": "station"}, {"type": "long", "name": "time"}, {"type": "int", "name": "temp"}], "doc": "A weather reading."}
Schema from users.avro file:
 {'type': 'record', 'name': 'Weather', 'namespace': 'test', 'fields': [{'type': 'string', 'name': 'station'}, {'type': 'long', 'name': 'time'}, {'type': 'int', 'name': 'temp'}], 'doc': 'A weather reading.'}
Stations:
 [{'station': '012334', 'time': 1323434, 'temp': 77}]


In [None]:
! pip3 install kaggle 
! export KAGGLE_USERNAME= my_name
! export KAGGLE_KEY= my_key

In [None]:
! kaggle datasets download  --force -d ruchi798/data-science-job-salaries
! unzip data-science-job-salaries

Downloading data-science-job-salaries.zip to /Users/ekaterinaakulova/DataScience
  0%|                                               | 0.00/7.37k [00:00<?, ?B/s]
100%|██████████████████████████████████████| 7.37k/7.37k [00:00<00:00, 2.62MB/s]
Archive:  data-science-job-salaries.zip
replace ds_salaries.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

In [95]:
pd_df = pd.read_csv('ds_salaries.csv')

CPU times: user 8.32 ms, sys: 4.66 ms, total: 13 ms
Wall time: 16.9 ms


In [137]:
pd_df.head(5)

CPU times: user 632 µs, sys: 2.56 ms, total: 3.19 ms
Wall time: 5.33 ms


Unnamed: 0.1,Unnamed: 0,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size
0,0,2020,MI,FT,Data Scientist,70000,EUR,79833,DE,0,DE,L
1,1,2020,SE,FT,Machine Learning Scientist,260000,USD,260000,JP,0,JP,S
2,2,2020,SE,FT,Big Data Engineer,85000,GBP,109024,GB,50,GB,M
3,3,2020,MI,FT,Product Data Analyst,20000,USD,20000,HN,0,HN,S
4,4,2020,SE,FT,Machine Learning Engineer,150000,USD,150000,US,50,US,L


In [97]:
csv_file = 'ds_salaries.csv'
df_csv_ss = ss.read.load(csv_file,
                     format="csv", sep=";", inferSchema="true", header="true") #spark csv

CPU times: user 3.16 ms, sys: 3.1 ms, total: 6.26 ms
Wall time: 1.87 s


In [98]:
df_csv_ss.head(5)

CPU times: user 5.88 ms, sys: 5.98 ms, total: 11.9 ms
Wall time: 202 ms


[Row(,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size='0,2020,MI,FT,Data Scientist,70000,EUR,79833,DE,0,DE,L'),
 Row(,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size='1,2020,SE,FT,Machine Learning Scientist,260000,USD,260000,JP,0,JP,S'),
 Row(,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size='2,2020,SE,FT,Big Data Engineer,85000,GBP,109024,GB,50,GB,M'),
 Row(,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size='3,2020,MI,FT,Product Data Analyst,20000,USD,20000,HN,0,HN,S'),
 Row(,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_reside

In [330]:
pd_df.to_parquet('ds_salaries.parquet') #Parquet — формат файлов,
#который хранит вложенные структуры данных в плоском столбчатом формате.
#Столбчатый формат более эффективен, когда вам нужно запросить из таблицы несколько столбцов. 
#Он прочитает только необходимые столбцы, потому что они находятся по соседству. Операции ввода-вывода сводятся к минимуму.

In [141]:
df_parquet.head(5) #формат данных parquet считывается быстрее 

CPU times: user 3.91 ms, sys: 2.95 ms, total: 6.86 ms
Wall time: 87.7 ms


[Row(Unnamed: 0=0, work_year=2020, experience_level='MI', employment_type='FT', job_title='Data Scientist', salary=70000, salary_currency='EUR', salary_in_usd=79833, employee_residence='DE', remote_ratio=0, company_location='DE', company_size='L'),
 Row(Unnamed: 0=1, work_year=2020, experience_level='SE', employment_type='FT', job_title='Machine Learning Scientist', salary=260000, salary_currency='USD', salary_in_usd=260000, employee_residence='JP', remote_ratio=0, company_location='JP', company_size='S'),
 Row(Unnamed: 0=2, work_year=2020, experience_level='SE', employment_type='FT', job_title='Big Data Engineer', salary=85000, salary_currency='GBP', salary_in_usd=109024, employee_residence='GB', remote_ratio=50, company_location='GB', company_size='M'),
 Row(Unnamed: 0=3, work_year=2020, experience_level='MI', employment_type='FT', job_title='Product Data Analyst', salary=20000, salary_currency='USD', salary_in_usd=20000, employee_residence='HN', remote_ratio=0, company_location='HN'

In [353]:
df_parquet = ss.read.load('ds_salaries.parquet')
df_parquet.select("job_title", "salary_in_usd").write.save("salaries_jobs.parquet") 

CPU times: user 3.57 ms, sys: 2.41 ms, total: 5.99 ms
Wall time: 408 ms


In [108]:
df = ss.read.parquet("salaries_jobs.parquet")
df.head(5)

CPU times: user 7.45 ms, sys: 14.9 ms, total: 22.3 ms
Wall time: 1.39 s


[Stage 25:>                                                         (0 + 1) / 1]                                                                                

[Row(job_title='Data Scientist', salary_in_usd=79833),
 Row(job_title='Machine Learning Scientist', salary_in_usd=260000),
 Row(job_title='Big Data Engineer', salary_in_usd=109024),
 Row(job_title='Product Data Analyst', salary_in_usd=20000),
 Row(job_title='Machine Learning Engineer', salary_in_usd=150000)]

In [101]:
spark_df = ss.read.option('header', True).csv('ds_salaries.csv')
spark_df.printSchema() 

root
 |-- _c0: string (nullable = true)
 |-- work_year: string (nullable = true)
 |-- experience_level: string (nullable = true)
 |-- employment_type: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- salary_currency: string (nullable = true)
 |-- salary_in_usd: string (nullable = true)
 |-- employee_residence: string (nullable = true)
 |-- remote_ratio: string (nullable = true)
 |-- company_location: string (nullable = true)
 |-- company_size: string (nullable = true)

CPU times: user 3.12 ms, sys: 2.3 ms, total: 5.42 ms
Wall time: 325 ms


In [369]:
spark_df.count(), len(spark_df.columns)

CPU times: user 2.14 ms, sys: 2.17 ms, total: 4.32 ms
Wall time: 129 ms


(607, 12)

In [376]:
spark_df = spark_df.select("work_year", "job_title", "salary_in_usd", "company_location", "company_size") 

CPU times: user 3.81 ms, sys: 2.16 ms, total: 5.97 ms
Wall time: 21.2 ms


In [201]:
cfg = [
    ('spark.driver.memory', '4g'), #выделяем 4гб оперативной памяти процессу
    ('spark.driver.maxResultSize', '2g'), #исполнители будут давать результаты размером макс 2 гь
    ('spark.executor.memory', '1g'), #1гб занимает каждый исполнитель
    ('spark.executor.instances', '2'), #количество исполнителей
    ('spark.dinamicAlocation.enabled', 'false'), #флаг отвечает за динамическое распределение ресурсов
    ('spark.default.parallelism', '8'), #на сколько частей будут разбиваться наборы данных чтобы работать с ними параллельно
    ('spark.eventlog.enabled', 'false') #более делально смотрит за работой спарка и читает логи
]

In [None]:
from pyspark import SparkContext, SparkConf, HiveContext, SQLContext

conf = SparkConf().setAll(cfg)
ss.stop() 
sc = SparkContext(appName = 'example', conf = conf)
ss = SparkSession(sc).builder.enableHiveSupport().getOrCreate()
hc = HiveContext(sc)

In [118]:
from pyspark.sql.functions import col, lit
spark_df.filter((col('work_year') == lit('2022')))

22/08/29 13:53:55 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size
 Schema: _c0, work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size
Expected: _c0 but found: 
CSV file: file:///Users/ekaterinaakulova/DataScience/ds_salaries.csv
22/08/29 13:53:55 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size
 Schema: _c0, work_year, experience_level, employment_type, job_title, salary, salary_currency, salary_in_usd, employee_residence, remote_ratio, company_location, company_size
Expected: _c0 but found: 
CSV file: fil

_c0,work_year,experience_level,employment_type,job_title,salary,salary_currency,salary_in_usd,employee_residence,remote_ratio,company_location,company_size
289,2022,SE,FT,Data Engineer,135000,USD,135000,US,100,US,M
290,2022,SE,FT,Data Analyst,155000,USD,155000,US,100,US,M
291,2022,SE,FT,Data Analyst,120600,USD,120600,US,100,US,M
292,2022,MI,FT,Data Scientist,130000,USD,130000,US,0,US,M
293,2022,MI,FT,Data Scientist,90000,USD,90000,US,0,US,M
294,2022,MI,FT,Data Engineer,170000,USD,170000,US,100,US,M
295,2022,MI,FT,Data Engineer,150000,USD,150000,US,100,US,M
296,2022,SE,FT,Data Analyst,102100,USD,102100,US,100,US,M
297,2022,SE,FT,Data Analyst,84900,USD,84900,US,100,US,M
298,2022,SE,FT,Data Scientist,136620,USD,136620,US,100,US,M


In [146]:
from pyspark.sql import functions as f

spark_df.filter((col('work_year') >= lit('2020')) & (col('work_year') <= lit('2022')))\
    .groupBy("experience_level") \
    .agg(f.min("work_year").alias("from"), 
         f.max("work_year").alias("to"), 
         
         f.min("salary_in_usd").alias("min salary in usd"),
         f.avg("salary_in_usd").alias("average salary in usd")
      ).show()

+----------------+----+----+-----------------+---------------------+
|experience_level|from|  to|min salary in usd|average salary in usd|
+----------------+----+----+-----------------+---------------------+
|              EN|2020|2022|            10000|   61643.318181818184|
|              EX|2020|2022|           110000|   199392.03846153847|
|              MI|2020|2022|           100000|    87996.05633802817|
|              SE|2020|2022|           100000|   138617.29285714286|
+----------------+----+----+-----------------+---------------------+



In [314]:
col = spark_df['company_location']
col

CPU times: user 729 µs, sys: 245 µs, total: 974 µs
Wall time: 936 µs


Column<'company_location'>

In [315]:
from pyspark.sql import Column
from pyspark.sql.functions import lower

type(spark_df.salary_in_usd) == type(spark_df.salary_in_usd + 1) == type(spark_df.salary_in_usd.isNull())

True

In [317]:
spark_df_0 = spark_df.select('company_location', 'salary_in_usd')
spark_df_0.withColumn('salary_in_usd + 1', spark_df_0.salary_in_usd + 1).show() 

+----------------+-------------+-----------------+
|company_location|salary_in_usd|salary_in_usd + 1|
+----------------+-------------+-----------------+
|              DE|        79833|          79834.0|
|              JP|       260000|         260001.0|
|              GB|       109024|         109025.0|
|              HN|        20000|          20001.0|
|              US|       150000|         150001.0|
|              US|        72000|          72001.0|
|              US|       190000|         190001.0|
|              HU|        35735|          35736.0|
|              US|       135000|         135001.0|
|              NZ|       125000|         125001.0|
|              FR|        51321|          51322.0|
|              IN|        40481|          40482.0|
|              FR|        39916|          39917.0|
|              US|        87000|          87001.0|
|              US|        85000|          85001.0|
|              PK|         8000|           8001.0|
|              JP|        41689

In [318]:
spark_df_0.filter(spark_df.company_location == 'RU') 

company_location,salary_in_usd
RU,230000
RU,85000
