# spark wordcount 1

In [None]:
!pip install pyspark

In [None]:
!pip install seaborn

In [None]:
!pip install tabulate

In [None]:
# import libraries
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

from pyspark.sql.window import Window
from pyspark.sql.functions import udf, isnan, min, max, sum, count, desc, expr, avg
from pyspark.sql.types import IntegerType, LongType

from pyspark.ml.feature import StandardScaler, VectorAssembler, MinMaxScaler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, LogisticRegression, GBTClassifier, LogisticRegressionModel, GBTClassificationModel, RandomForestClassificationModel

import matplotlib.pyplot as plt
import seaborn as sns
import datetime
import numpy as np
import pandas as pd
import re  # Добавляем импорт модуля re
from tabulate import tabulate
import matplotlib.pyplot as plt

In [None]:
from pyspark.sql import SparkSession

# Создание SparkSession
spark = SparkSession.builder \
    .appName("WordCount App") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .config("spark.ui.port", "4050") \
    .getOrCreate()

# Установка количества разделов для shuffle операций
spark.conf.set("spark.sql.shuffle.partitions", "50")

# Чтение данных из HDFS (текстовый файл)
file_path = "hdfs://localhost:9000/user5/sparkdir/idiot.txt"
df = spark.read.text(file_path)  # Используем .text, так как это текстовый файл

# Печать первых нескольких строк
df.show()



In [None]:
# Преобразование текста в RDD для подсчета частоты слов
text_rdd = df.rdd.flatMap(lambda row: row[0].split())

def clean_word(word):
    # Приводим слово к нижнему регистру
    word = word.lower()
    # Убираем все спецсимволы и цифры
    word = re.sub(r'[^a-zA-Zа-яА-ЯёЁ]', '', word)  # Исправлено регулярное выражение
    # Убираем начальные и конечные пробелы
    word = word.strip()
    return word

# Очистка и фильтрация слов
text_rdd_cleaned = (text_rdd
                    .filter(lambda x: x is not None)
                    .map(lambda x: clean_word(x))  # Используем функцию очистки
                    .filter(lambda x: len(x) > 3)  # Убираем слова длиной 3 и менее
                    .filter(lambda x: len(x) > 0)  # Убираем пустые строки
                    .collect())

# Подсчет частоты слов
word_counts = (spark.sparkContext.parallelize(text_rdd_cleaned)  # Используем spark.sparkContext для создания RDD
               .map(lambda word: (word, 1))
               .reduceByKey(lambda x, y: x + y)
               .filter(lambda x: x[1] > 1))  # Оставляем только слова, встречающиеся более одного раза

# Преобразуем в DataFrame
word_counts_df = spark.createDataFrame(word_counts, ["word", "count"])

# Сортируем по убыванию частоты и выбираем топ-10
top_10_words = word_counts_df.orderBy("count", ascending=False).limit(10)

# Создаем красивую таблицу с помощью pandas
table_data = top_10_words.toPandas()
print("Топ-10 наиболее частых слов:")
print(table_data.to_string(index=False))

# Визуализация
plt.figure(figsize=(12, 6))
sns.barplot(
    data=table_data,
    x='count',
    y='word',
    hue='word',  # Явно указываем hue для устранения предупреждения
    palette='viridis',
    legend=False  # Отключаем легенду
)
plt.title('Топ-10 наиболее часто встречающихся слов', pad=20)
plt.xlabel('Частота')
plt.ylabel('Слово')
plt.tight_layout()
plt.show()

# sparksql

In [None]:
from pyspark.sql import SparkSession

# Создание SparkSession
spark = SparkSession.builder \
    .appName("SQL App") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .config("spark.ui.port", "4050") \
    .getOrCreate()

# Установка количества разделов для shuffle операций
spark.conf.set("spark.sql.shuffle.partitions", "50")

# Чтение данных из HDFS (текстовый файл)
file_path = "hdfs://localhost:9000/user5/sparkdir/sales.csv"
header_path = "hdfs://localhost:9000/user5/sparkdir/sales_header.csv"




In [None]:
def parse_row(line):
    # Парсим строку (разделение по запятой)
    return line.split(',')

# Чтение данных с использованием RDD
sales_rdd = spark.sparkContext.textFile(file_path).map(parse_row)

# Чтение заголовка из HDFS
sales_header_rdd = spark.sparkContext.textFile(header_path).take(1)  # Читаем первую строку заголовка
sales_header = sales_header_rdd[0].split(',')  # Разделяем по запятой


In [None]:
# Преобразуем RDD в DataFrame с заголовками
sales_df = spark.createDataFrame(sales_rdd, sales_header)

# Регистрация временной таблицы
sales_df.createOrReplaceTempView('sales')

# Выполнение SQL-запроса для получения всех данных
sales_total_df = spark.sql("SELECT * FROM sales")

# Печать первых 10 строк
sales_total_df.show(10)

In [None]:
# Визуализация распределения цен по странам
# Преобразуем DataFrame в pandas для визуализации
sales_total_df_pd = sales_total_df.toPandas()

# Построение графика для анализа цен по странам
plt.figure(figsize=(12, 6))
sns.boxplot(
    data=sales_total_df_pd,
    x='Country',  # Страна
    y='Price',    # Цена
    palette='viridis'
)

plt.title('Распределение цен по странам')
plt.xlabel('Страна')
plt.ylabel('Цена')
plt.xticks(rotation=90)  # Поворачиваем подписи на оси X для лучшей читаемости
plt.tight_layout()
plt.show()

In [None]:
# TODO: Вывод дохода по стране и штату
revenue_by_country_state = spark.sql("""
    SELECT Country, State, SUM(Price) as Total_Revenue
    FROM sales
    GROUP BY Country, State
    ORDER BY Total_Revenue DESC
""")

# Отображаем результаты
revenue_by_country_state.show()