<a href="https://colab.research.google.com/github/AlinaSabitova/-_-/blob/main/analyze_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!/usr/bin/env python3
"""
Анализ зарплат в Data Science с использованием PySpark
Задача: найти среднюю зарплату по уровню опыта
"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, stddev, min as spark_min, max as spark_max, median
from pyspark.sql.types import FloatType
import sys

def create_spark_session():
    """Создать Spark сессию"""
    spark = SparkSession.builder \
        .appName("Salary Analysis") \
        .master("local[*]") \
        .config("spark.driver.memory", "2g") \
        .config("spark.executor.memory", "2g") \
        .getOrCreate()
    return spark

def load_data(spark, filepath):
    """Загрузить данные из HDFS или локального файла"""
    print(f"Загрузка данных из: {filepath}")

    try:
        # Попробовать загрузить из HDFS
        df = spark.read.csv(filepath, header=True, inferSchema=True)
        print(f"Данные загружены из HDFS")
    except:
        # Если не удалось, загрузить локально
        df = spark.read.csv(filepath, header=True, inferSchema=True)
        print(f"Данные загружены локально")

    print(f"Количество строк: {df.count()}")
    return df

def clean_and_prepare(df):
    """Очистка и подготовка данных"""
    print("\n=== Очистка данных ===")
    print(f"Исходное количество строк: {df.count()}")

    # Удалить строки с null в зарплате
    df = df.filter(col('salary_in_usd').isNotNull())

    # Заполнить null в experience_level
    df = df.na.fill('Unknown', subset=['experience_level'])

    # Создать колонку с полными названиями уровней опыта
    from pyspark.sql.functions import when
    df = df.withColumn(
        'experience_level_name',
        when(col('experience_level') == 'EN', 'Entry-level')
        .when(col('experience_level') == 'MI', 'Mid-level')
        .when(col('experience_level') == 'SE', 'Senior-level')
        .when(col('experience_level') == 'EX', 'Executive-level')
        .otherwise('Unknown')
    )

    print(f"Количество строк после очистки: {df.count()}")
    print(f"Уникальных уровней опыта: {df.select('experience_level').distinct().count()}")

    return df

def analyze_salary_by_experience(df):
    """Анализ средней зарплаты по уровням опыта"""
    print("\n=== Анализ средней зарплаты по уровням опыта ===")

    # Группировка и агрегация
    result = df.groupBy('experience_level', 'experience_level_name') \
        .agg(
            avg('salary_in_usd').alias('Mean_Salary_USD'),
            count('*').alias('Count'),
            stddev('salary_in_usd').alias('Std_Deviation'),
            spark_min('salary_in_usd').alias('Min_Salary'),
            spark_max('salary_in_usd').alias('Max_Salary')
        ) \
        .orderBy(col('Mean_Salary_USD').desc())

    return result

def additional_analysis(df):
    """Дополнительный анализ данных"""
    print("\n=== Дополнительная статистика ===")

    # Общая статистика по зарплатам
    salary_stats = df.agg(
        avg('salary_in_usd').alias('avg_salary'),
        spark_min('salary_in_usd').alias('min_salary'),
        spark_max('salary_in_usd').alias('max_salary')
    ).collect()[0]

    print(f"Общая статистика зарплат в USD:")
    print(f"Средняя зарплата: ${salary_stats['avg_salary']:,.2f}")
    print(f"Минимальная зарплата: ${salary_stats['min_salary']:,.2f}")
    print(f"Максимальная зарплата: ${salary_stats['max_salary']:,.2f}")

    # Распределение по уровням опыта
    print(f"\nРаспределение по уровням опыта:")
    experience_counts = df.groupBy('experience_level_name').agg(count('*').alias('count')).collect()
    total_count = df.count()

    for row in experience_counts:
        percentage = (row['count'] / total_count) * 100
        print(f"  {row['experience_level_name']}: {row['count']} записей ({percentage:.1f}%)")

def main():
    # Путь к данным
    hdfs_path = "hdfs://localhost:9000/user/hadoop/input/salary_data.csv"
    local_path = "/opt/data/salary_data.csv"

    # Создать Spark сессию
    spark = create_spark_session()

    print("=== Анализ зарплат в Data Science с использованием PySpark ===")

    # Показать конфигурацию
    print("\n=== Конфигурация Spark ===")
    print(f"Version: {spark.version}")
    print(f"Master: {spark.sparkContext.master}")

    # Загрузить данные
    try:
        df = load_data(spark, hdfs_path)
    except Exception as e:
        print(f"Не удалось загрузить из HDFS: {e}")
        print("Попытка загрузить локально...")
        df = load_data(spark, local_path)

    # Показать схему и первые строки
    print("\n=== Схема данных ===")
    df.printSchema()
    print("\nПервые 5 строк (ключевые колонки):")
    df.select('work_year', 'experience_level', 'job_title', 'salary_in_usd').show(5)

    # Очистка данных
    df_clean = clean_and_prepare(df)

    # Анализ
    result = analyze_salary_by_experience(df_clean)

    # Показать результаты
    print("\n=== Результаты анализа зарплат ===")
    print("\nУровни опыта по средней зарплате:")
    result.show(truncate=False)

    # Найти уровень с максимальной и минимальной зарплатой
    result_list = result.collect()
    max_salary_row = result_list[0]
    min_salary_row = result_list[-1]

    print(f"\nУровень опыта с максимальной средней зарплатой: '{max_salary_row['experience_level_name']}' ({max_salary_row['experience_level']})")
    print(f"Средняя максимальная зарплата: ${max_salary_row['Mean_Salary_USD']:,.2f} USD")
    print(f"Количество специалистов с максимальной зарплатой: {max_salary_row['Count']}")
    print(f"Диапазон зарплат: ${max_salary_row['Min_Salary']:,.2f} - ${max_salary_row['Max_Salary']:,.2f} USD")

    print(f"\nУровень опыта с минимальной средней зарплатой: '{min_salary_row['experience_level_name']}' ({min_salary_row['experience_level']})")
    print(f"Средняя минимальная зарплата: ${min_salary_row['Mean_Salary_USD']:,.2f} USD")
    print(f"Количество специалистов с минимальной зарплатой: {min_salary_row['Count']}")
    print(f"Диапазон зарплат: ${min_salary_row['Min_Salary']:,.2f} - ${min_salary_row['Max_Salary']:,.2f} USD")

    # Дополнительный анализ
    additional_analysis(df_clean)

    # Сохранить результаты
    output_path = "results/salary_by_experience_spark"
    result.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)
    print(f"\nРезультаты сохранены в: {output_path}")

    # Попробовать сохранить в HDFS
    try:
        hdfs_output = "hdfs://localhost:9000/user/hadoop/output/salary_by_experience"
        result.coalesce(1).write.mode("overwrite").option("header", "true").csv(hdfs_output)
        print(f"Результаты также сохранены в HDFS: {hdfs_output}")
    except Exception as e:
        print(f"Не удалось сохранить в HDFS: {e}")

    spark.stop()

if __name__ == '__main__':
    main()