### Задание:
    
Допустим, имеются колоночные данные, которые находят в файле формата CSV. Необходимо реализовать класс (в конструкторе должен быть параметр - ссылка на файл), в котором будет метод эмулирующий работу функции pandas.DataFrame.describe в двух режимах: вычислений на python и на pySpark. Использовать библиотеку pandas и прочие библиотеки с аналогом функции describe нельзя. 

На выходе функции должен быть словарь с ключами - именами колонок и значениями - результатами вычислений.   

In [1]:
import csv
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession

#### Данные для тестирования (csv-файл)

Задача на kaggle: https://www.kaggle.com/c/bike-sharing-demand

По историческим данным о прокате велосипедов и погодным условиям необходимо оценить спрос на прокат велосипедов.

В исходной постановке задачи доступно 11 признаков: https://www.kaggle.com/c/bike-sharing-demand/data

В наборе признаков присутсвуют вещественные, категориальные, и бинарные данные. 

Для демонстрации используется обучающая выборка из исходных данных train.csv, файлы для работы прилагаются.

In [12]:
path = r'C:\Users\hadjd\Jupyter_Notebooks\train.csv'

#### Работа оригинальной функции describe из библиотеки Pandas

In [13]:
df = pd.read_csv(path)
df.describe()

Unnamed: 0,season,holiday,workingday,weather,temp,atemp,humidity,windspeed,casual,registered,count
count,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0
mean,2.506614,0.028569,0.680875,1.418427,20.23086,23.655084,61.88646,12.799395,36.021955,155.552177,191.574132
std,1.116174,0.166599,0.466159,0.633839,7.79159,8.474601,19.245033,8.164537,49.960477,151.039033,181.144454
min,1.0,0.0,0.0,1.0,0.82,0.76,0.0,0.0,0.0,0.0,1.0
25%,2.0,0.0,0.0,1.0,13.94,16.665,47.0,7.0015,4.0,36.0,42.0
50%,3.0,0.0,1.0,1.0,20.5,24.24,62.0,12.998,17.0,118.0,145.0
75%,4.0,0.0,1.0,2.0,26.24,31.06,77.0,16.9979,49.0,222.0,284.0
max,4.0,1.0,1.0,4.0,41.0,45.455,100.0,56.9969,367.0,886.0,977.0


#### Реализация класса, эмулирующего работу функциции describe()

In [56]:
class as_pandas:
    #конструктор принимает на вход ссылку на файл
    def __init__(self, file_obj):
        self.file = file_obj
        
    #метод as_describe принимает на вход явно режим вычислений
    #'python' / 'pySpark'
    def as_describe(self, mode):
        self.mode = mode
        #чтение csv с помощью DictReader
        reader = csv.DictReader(self.file)
        columns = reader.fieldnames
        # словарь, ключи - колонки, значения - список всех значений
        dict_ = {}
        # множество для записи колонок, где есть значения, 
        # не преобразующиеся к float, т.е. категориальные значения
        # оригинальная функция describe не обрабатывает такие колонки
        col_with_no_digit_values = set()
        
        for line in reader:
            for col in columns:
                #пытаемся поместить в словарь новое значение, преобразовав во float
                try:
                    if col not in dict_.keys():
                        dict_[col] = [float(line[col])]
                    else:
                        dict_[col].append(float(line[col]))
                #если преобразование типов не срабатывает, заносим в множество
                except ValueError:
                        col_with_no_digit_values.add(col)
                        continue
        #удаляем из словаря колонки с категориальными признаками
        for col in  col_with_no_digit_values:
            if col in dict_.keys():
                del dict_[col]
                
        # выбор режима        
        if self.mode == 'python':
            
            dict_res = {}
            for key in dict_.keys():
                dict_res[key] = {'count':float(len(dict_[key]))}
                dict_res[key]['mean'] = np.mean(dict_[key])
                dict_res[key]['std'] = np.std(dict_[key],ddof=1)
                dict_res[key]['min'] = np.min(dict_[key])
                dict_res[key]['25%'] = np.quantile(dict_[key],0.25)
                dict_res[key]['50%'] = np.quantile(dict_[key],0.5)
                dict_res[key]['75%'] = np.quantile(dict_[key],0.75)
                dict_res[key]['max'] = np.max(dict_[key])
                
                print(key,dict_res[key], sep = '\n')
                
        # выбор режима 
        elif self.mode == 'pySpark':
            
            spark = SparkSession.builder.appName('PySpark').getOrCreate()
            sc = spark.sparkContext
            
            #из словаря dict_ делаем список кортежей, чтобы далее на его основе создавать rdd
            collection = []
            for key, val in dict_.items():
                collection.append((key,val))
            
            rdd = sc.parallelize(collection)

            result = rdd.map(lambda x: {x[0]:{'count':len(x[1]),
                         'mean':np.mean(x[1]),
                         'std':np.std(x[1],ddof=1),
                         'min':np.min(x[1]),
                         '25%':np.quantile(x[1],0.25),
                         '50%':np.quantile(x[1],0.5),
                         '75%':np.quantile(x[1],0.75),
                         'max':np.max(x[1]),}}).collect()
            
            #для приведения вывода к единому формату, преобразуем result (список словарей) к словарю с вложенными
            #словарями (аналогично вычислению в режиме 'python')
            dict_res = {}
            for el in result:
                for key, val in el.items():
                    dict_res[key] = val
                print(key,dict_res[key], sep = '\n')
                
        return dict_res

#### Тестирование функции as_describe из класса as_pandas

#### 1. Вычисления в режиме python

In [50]:
%%time
with open(path) as f:
    file = as_pandas(f)
    res = file.as_describe('python')

season
{'count': 10886.0, 'mean': 2.5066139996325556, 'std': 1.116174309344325, 'min': 1.0, '25%': 2.0, '50%': 3.0, '75%': 4.0, 'max': 4.0}
holiday
{'count': 10886.0, 'mean': 0.02856880396839978, 'std': 0.16659885062470958, 'min': 0.0, '25%': 0.0, '50%': 0.0, '75%': 0.0, 'max': 1.0}
workingday
{'count': 10886.0, 'mean': 0.6808745177291935, 'std': 0.4661591687997356, 'min': 0.0, '25%': 0.0, '50%': 1.0, '75%': 1.0, 'max': 1.0}
weather
{'count': 10886.0, 'mean': 1.418427337865148, 'std': 0.6338385858190958, 'min': 1.0, '25%': 1.0, '50%': 1.0, '75%': 2.0, 'max': 4.0}
temp
{'count': 10886.0, 'mean': 20.23085981995223, 'std': 7.791589843987567, 'min': 0.82, '25%': 13.94, '50%': 20.5, '75%': 26.24, 'max': 41.0}
atemp
{'count': 10886.0, 'mean': 23.655084052912, 'std': 8.474600626484948, 'min': 0.76, '25%': 16.665, '50%': 24.24, '75%': 31.06, 'max': 45.455}
humidity
{'count': 10886.0, 'mean': 61.88645967297446, 'std': 19.24503327739469, 'min': 0.0, '25%': 47.0, '50%': 62.0, '75%': 77.0, 'max': 

#### Сравним результаты с работой оригинальной функции describe(). 

Для этого создадим новый pandas.DataFrame на основе словаря, который возвращает метод as_describe класса as_pandas

In [51]:
df_check = pd.DataFrame.from_dict(res)
#переопределяем порядок индексов в новом DF, в соответствии с порядком
#ключей в вложенных словарях
df_check = df_check.reindex(res['season'].keys())
df_check

Unnamed: 0,season,holiday,workingday,weather,temp,atemp,humidity,windspeed,casual,registered,count
count,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0
mean,2.506614,0.028569,0.680875,1.418427,20.23086,23.655084,61.88646,12.799395,36.021955,155.552177,191.574132
std,1.116174,0.166599,0.466159,0.633839,7.79159,8.474601,19.245033,8.164537,49.960477,151.039033,181.144454
min,1.0,0.0,0.0,1.0,0.82,0.76,0.0,0.0,0.0,0.0,1.0
25%,2.0,0.0,0.0,1.0,13.94,16.665,47.0,7.0015,4.0,36.0,42.0
50%,3.0,0.0,1.0,1.0,20.5,24.24,62.0,12.998,17.0,118.0,145.0
75%,4.0,0.0,1.0,2.0,26.24,31.06,77.0,16.9979,49.0,222.0,284.0
max,4.0,1.0,1.0,4.0,41.0,45.455,100.0,56.9969,367.0,886.0,977.0


In [52]:
(df.describe() == df_check).all().all()

True

#### 2. Вычисления в режиме pySpark

In [53]:
%%time
with open(path) as f:
    file = as_pandas(f)
    res_2 = file.as_describe('pySpark')

season
{'count': 10886, 'mean': 2.5066139996325556, 'std': 1.116174309344325, 'min': 1.0, '25%': 2.0, '50%': 3.0, '75%': 4.0, 'max': 4.0}
holiday
{'count': 10886, 'mean': 0.02856880396839978, 'std': 0.16659885062470958, 'min': 0.0, '25%': 0.0, '50%': 0.0, '75%': 0.0, 'max': 1.0}
workingday
{'count': 10886, 'mean': 0.6808745177291935, 'std': 0.4661591687997356, 'min': 0.0, '25%': 0.0, '50%': 1.0, '75%': 1.0, 'max': 1.0}
weather
{'count': 10886, 'mean': 1.418427337865148, 'std': 0.6338385858190958, 'min': 1.0, '25%': 1.0, '50%': 1.0, '75%': 2.0, 'max': 4.0}
temp
{'count': 10886, 'mean': 20.23085981995223, 'std': 7.791589843987567, 'min': 0.82, '25%': 13.94, '50%': 20.5, '75%': 26.24, 'max': 41.0}
atemp
{'count': 10886, 'mean': 23.655084052912, 'std': 8.474600626484948, 'min': 0.76, '25%': 16.665, '50%': 24.24, '75%': 31.06, 'max': 45.455}
humidity
{'count': 10886, 'mean': 61.88645967297446, 'std': 19.24503327739469, 'min': 0.0, '25%': 47.0, '50%': 62.0, '75%': 77.0, 'max': 100.0}
windspe

#### Сравним результаты с работой оригинальной функции describe(). 

Для этого создадим новый pandas.DataFrame на основе словаря, который возвращает метод as_describe класса as_pandas

In [54]:
df_check_2 = pd.DataFrame.from_dict(res_2)
#переопределяем порядок индексов в новом DF, в соответствии с порядком
#ключей в вложенных словарях
df_check_2 = df_check_2.reindex(res_2['season'].keys())
df_check_2

Unnamed: 0,season,holiday,workingday,weather,temp,atemp,humidity,windspeed,casual,registered,count
count,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0,10886.0
mean,2.506614,0.028569,0.680875,1.418427,20.23086,23.655084,61.88646,12.799395,36.021955,155.552177,191.574132
std,1.116174,0.166599,0.466159,0.633839,7.79159,8.474601,19.245033,8.164537,49.960477,151.039033,181.144454
min,1.0,0.0,0.0,1.0,0.82,0.76,0.0,0.0,0.0,0.0,1.0
25%,2.0,0.0,0.0,1.0,13.94,16.665,47.0,7.0015,4.0,36.0,42.0
50%,3.0,0.0,1.0,1.0,20.5,24.24,62.0,12.998,17.0,118.0,145.0
75%,4.0,0.0,1.0,2.0,26.24,31.06,77.0,16.9979,49.0,222.0,284.0
max,4.0,1.0,1.0,4.0,41.0,45.455,100.0,56.9969,367.0,886.0,977.0


In [55]:
(df.describe() == df_check_2).all().all()

True