<a href="https://colab.research.google.com/github/Kerei1988/-/blob/main/myspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Тема дипломной:
# № 5. Анализ и сравнение различных способов обработки и хранения больших данных:
# Pandas, Dask и Apache Spark

In [None]:

!pip install pyspark
!pip install pandas




In [None]:
from pyspark.sql import SparkSession, DataFrame, Row
import matplotlib.pyplot as plt
from pyspark.sql.functions import *
from collections import defaultdict
from pyspark.sql.types import FloatType, DecimalType, IntegerType



# Открываем сессию Spark, для чтения базы данных с файла, для дальнейшей обратботки и анализа
spark = SparkSession.builder.getOrCreate()

# Читаем файл csv
sdf = spark.read.option('multiline', True).csv('Data-Science-Jobs.csv', header=True)

# Визуальный осмотр данных (первые 5 строк)
sdf.show(5, truncate=100)

# Информация о базе данных: база данных сотсовляет из 500 строк,
# столбцы  ' Salary', '  Logo ', ' Company Rating' включают пропущенные значени
# Столбцы 'Salary' и 'Company Rating',  'Date' - тип данных String,
sdf.printSchema()
sdf.describe().show()

# Около 13% строк, содержащие не заполненные поля,  удаляем. Для анализа данных останется 387 строк
df = sdf.dropna()
df_count = df.count()
print(df_count)
df.describe().show()



In [None]:

# TOP 10 популярныx региона США, для найма специалистов в области обработки данных , визуализируем для удобного восприятия.
# график показывает что наибольшее количество вакансий Riverwoods, IL(31), Remote (22), New York, NY (21)
location = df.groupBy('Location').count().orderBy('count', ascending=False)
pandas_location = location.toPandas().set_index('Location').head(10)
pandas_location.plot.bar()

# Найдем города с наибольши и наименьшим спросом на специалистов.
# Количество вакансии меньше 6 - 117 регион
# Количество вакинсий больше 15 - 3 региона
small_location = location.filter(location['count'] <= 5)
print(small_location.count())
most_location = location.filter(location['count'] >= 15)
print(most_location.count())
# количество специальностей - 223
number_specialists = df.select('Job Title').drop_duplicates()
number_specialists.count()

# Самые востребованные специалисты:
# 1 -  Data Scientist
# 2 - Senior Data Scientist
# 3 - Senior Manager Data Scientist, Principal Data Science
most_popular_specialties = df.groupBy('Job Title').count().sort('count', ascending=False).show(truncate=100)

# количество наименее востребованных специалистов, с одной вакансией на рынке труда - 154 вакансий
least_popular_specialties = df.groupBy('Job Title').count().filter('count < 2').count()
print(least_popular_specialties)


In [None]:
# найдем среднее количество дней актуальности вакансий - 19+
# минимальный срок актуальности - 1
# максимальное количество дней - 30
date_df = df[['Date']]
date_df = date_df.withColumn('Date', regexp_replace('Date', '24h', '1d'))
date_df = date_df.withColumn('Date', regexp_extract('Date', r'\b(\d+).+', 1).astype(IntegerType()))
date_min = date_df.select(min(date_df.Date)).show()
date_max = date_df.agg({'Date': 'max'}).show()
date_mean = date_df.select(mean(date_df.Date)).show()

In [None]:

# найдем самого активного работодателя на рынке труда из базы данных - 'Discover Financial Services' (31 вакания)
# Общее количество работодателей 188
# Количество компаний с одной вакансией на рынке труда -117
# отобразим на графике 10 самых активных работодателей

company_name = df[['Company Name']].withColumn('Company Name', regexp_replace('Company Name', '(\n\d\.\d)', ''))
number_vacancies_per_company = company_name.groupBy('Company Name').count().sort('count', ascending=False) # Количество вакансий на 1 работодателя
number_vacancies = number_vacancies_per_company.filter(number_vacancies_per_company['count'] < 2).drop_duplicates().count() #
print(number_vacancies)
number_companies = company_name.drop_duplicates().count() #количество работодателей

# десятка самых активных работодателей базы данных
part = number_vacancies_per_company.withColumn('count', (number_vacancies_per_company['count']/number_companies)*100)
part_pandas = part.toPandas().set_index('Company Name').head(10)
part_pandas.plot.barh()
plt.show()

In [None]:
# Для анализа рынка труда, нужно столбец "Salary", преобразовать в числовой тип

salary = df[['Job Title', 'Salary']].withColumn('Salary', translate('Salary', '$|K', '').alias('Salary'))
salary_per = salary.filter(salary.Salary.like('%Per%'))
salary_month = salary.join(salary_per[['Job Title', 'Salary']], 'Salary', how='left_anti')

salary_per =  salary_per.withColumn('Salary', regexp_extract('Salary', '(\d\d.\d\d - \d\d.\d\d)|(\d\d)', 0))
salary_per = salary_per.withColumn('Salary_1', split('Salary', '-')[1].cast(DecimalType(5,2)))
salary_per = salary_per.withColumn('Salary_0', split('Salary', '-')[0])
salary_per = salary_per.withColumn('Salary', coalesce((col('Salary_1')+col('Salary_0'))/2, lit(salary_per['Salary_0'])))
salary_per = salary_per.withColumn('Salary', col('Salary')*8*22/1000)
salary_per = salary_per.withColumn('Salary, $, K', col('Salary').cast(DecimalType(5,2)))
salary_per = salary_per.select('Job Title', 'Salary, $, K')

salary_month = salary_month.withColumn('Salary', translate('Salary', ' ', ''))
salary_month = salary_month.withColumn('Salary', regexp_extract('Salary', '\d+-\d+|\d+', 0))
salary_month = salary_month.withColumn('Salary_1', split('Salary', '-')[1].cast(DecimalType(5,2)))
salary_month = salary_month.withColumn('Salary_0', split('Salary', '-')[0])
salary_month = salary_month.withColumn('Salary, $, K', coalesce((col('Salary_1')+col('Salary_0'))/2, lit(salary_month['Salary_0'])).cast(DecimalType(5,2)))
salary_month = salary_month.select('Job Title', 'Salary, $, K')

# Преобразованная таблица 'Salary'
salary = salary_month.join(salary_per, ['Job Title', 'Salary, $, K'], how='outer')

# Максимальная оплата труда - 4 вакансии 'Reliability, Availability and Serviceability Expert, Datacenter AI Products Development'($272.5 K) от компании NVIDIA
# Минимальная - Junior Data Science ($2.46 K)
salary_max = salary.orderBy('Salary, $, K', ascending=False)
salary_min = salary.orderBy('Salary, $, K', ascending=True)

#Средняя заработная плата по рынку состовляет 117.405
avg_salary = salary.agg({'Salary, $, K': 'mean'})

# Лучшие работодатели по рейтингу
rating_company = df[['Company Name', 'Company Rating']]
rating_company = df[['Company Name','Company Rating']].withColumn('Company Name', regexp_replace('Company Name', '(\n\d\.\d)', '')).sort('Company Rating', ascending=False)
pandas_rating = rating_company.toPandas().set_index('Company Name').astype(float).drop_duplicates().head(10)
pandas_rating.plot.barh()



