# Общий ноутбук с полным решением продуктового анализа велопроката

## Команда

Название:

**Бигдатка**

Участники:
* **Калугин Константин** - Тимлид, ответственный за станции
* **Зейгман Константин** - Ответственный за байки
* **Муляр Никита** - Ответственный за целевую аудиторию

Ментор:

**Ермаков Егор**

## Предисловие

Необходимо разбить решение на следующие части:
1. Обработка сырого датасета, приведение всех данных к единому стандарту для более удобной работы и автоматизации вычислений
2. Разведочный анализ данных, чтобы понять, с какими данными можно поработать
3. Анализ станций
4. Определение портрета целевой аудитории
5. Анализ типов велосипедов
6. Проанализировать экономику велопроката
7. Сделать выводы

Анализ датасета будет состоять из двух частей: выдвижение гипотезы и ее подтверждение (или опровержение, что тоже хорошо)

## Решение

### Обработка датасета

#### Проблема

1. Таблицы за 2013-2019 года отличаются по присутствующим данным от таблиц за 2020-2022 год
2. В некоторых таблицах данные представлены с кавычками
3. В таблицах за 2013-2019 года некоторая информация о станциях находится в других файлах

#### Решение проблемы

Определяем библиотеки, которые будут задействованы для обработки датасета

In [None]:
from pyspark import SparkConf, SparkContext
import os
import datetime
import shutil

Определяем функции, которые будут задествованы для стандартизации записи данных

In [None]:
def filter_first_2013_2019(line):
    try:
        first_elem = int(line[0])
        return True
    except:
        return False


def filter_first_2020_2023(line):
    if line[0] == 'ride_id':
        return False
    return True


def del_quotation_marks(elems):
    for i in range(len(elems)):
        if '"' in elems[i]:
            elems[i] = elems[i][1:-1]
    return elems


def map_2013_2019(elems):
    try:
        if '/' not in elems[1]:
            if len(elems[1].split()[1]) != 8:
                return ','.join([elems[0], '', datetime.datetime.strptime(elems[1], "%Y-%m-%d %H:%M").isoformat(),
                                 datetime.datetime.strptime(elems[2], "%Y-%m-%d %H:%M").isoformat(), elems[-6],
                                 elems[-4], '', '', '', '', elems[-3], elems[-2],
                                 str(datetime.datetime.strptime(elems[1], "%Y-%m-%d %H:%M").year - int(elems[-1])) if elems[-1] else '', elems[3]])
            else:
                return ','.join([elems[0], '', datetime.datetime.strptime(elems[1], "%Y-%m-%d %H:%M:%S").isoformat(),
                                 datetime.datetime.strptime(elems[2], "%Y-%m-%d %H:%M:%S").isoformat(), elems[-6],
                                 elems[-4], '', '', '', '', elems[-3], elems[-2],
                                 str(datetime.datetime.strptime(elems[1], "%Y-%m-%d %H:%M:%S").year - int(elems[-1])) if elems[-1] else '', elems[3]])
        else:
            if elems[1][-3] == ':' and elems[1][-6] == ':':
                return ','.join([elems[0], '', datetime.datetime.strptime(elems[1], "%m/%d/%Y %H:%M:%S").isoformat(),
                                 datetime.datetime.strptime(elems[2], "%m/%d/%Y %H:%M:%S").isoformat(), elems[-6],
                                 elems[-4], '', '', '', '', elems[-3], elems[-2],
                                 str(datetime.datetime.strptime(elems[1], "%m/%d/%Y %H:%M:%S").year - int(elems[-1])) if elems[-1] else '', elems[3]])
            else:
                return ','.join([elems[0], '', datetime.datetime.strptime(elems[1], "%m/%d/%Y %H:%M").isoformat(),
                             datetime.datetime.strptime(elems[2], "%m/%d/%Y %H:%M").isoformat(), elems[-6],
                             elems[-4], '', '', '', '', elems[-3], elems[-2],
                             str(datetime.datetime.strptime(elems[1], "%m/%d/%Y %H:%M").year - int(elems[-1])) if elems[-1] else '', elems[3]])
    except:
        return ''


def map_2020_2023(elems):
     try:
        return ','.join([elems[0], elems[1], datetime.datetime.strptime(elems[2], "%Y-%m-%d %H:%M:%S").isoformat(),
                         datetime.datetime.strptime(elems[3], "%Y-%m-%d %H:%M:%S").isoformat(),
                         elems[4], elems[6], elems[8], elems[9],
                         elems[10], elems[11], elems[12], '', '', ''])
     except:
         return ''

Определяем функции, которые будут задействованы для получения координат станций (требуется для 2013-2019 годов)

In [None]:
def mapper_st(line):
    info = line.split(",")
    if info[0].isnumeric() and info[1] != "" and info[2] != "" and info[3] != "":
        yield (info[0], (info[1], info[2], info[3]))

def mapper_tr(line):
    id_, type_, started_at, ended_at, start_name, end_name, start_lat, start_lng, end_lat, end_lng, member_casul, gender, age, bikeid = line.split(",")
    return {
        "id": int(id_),
        "type": type_,
        "started_at": started_at,
        "ended_at": ended_at,
        "start_name": start_name,
        "end_name": end_name,
        "member_casul": member_casul,
        "gender": gender,
        "age": age,
        "bikeid": bikeid
    }

def mapper_inter(line):
    from_name, (data, station) = line
    return {
        "id": data["id"],
        "type": data["type"],
        "started_at": data["started_at"],
        "ended_at": data["ended_at"],
        "start_name": data["start_name"],
        "end_name": data["end_name"],
        "start_lat": station["latitude"] if station else "",
        "start_lng": station["longitude"] if station else "",
        "member_casul": data["member_casul"],
        "gender": data["gender"],
        "age": data["age"],
        "bikeid": data["bikeid"]
    }

def mapper_st_final(x):
    return {
        "name": x[1][0],
        "latitude": x[1][1],
        "longitude": x[1][2]
    }

def finalise(line):
    to_name, (data, station) = line
    return ",".join([str(x) for x in [
        data["id"],
        data["type"],
        data["started_at"],
        data["ended_at"],
        data["start_name"],
        data["end_name"],
        data["start_lat"],
        data["start_lng"],
        station["latitude"] if station else "",
        station["longitude"] if station else "",
        data["member_casul"],
        data["gender"],
        data["age"],
        data["bikeid"]
    ]])


Определяем функцию, которая реализует описанные выше функции для стандартизации

In [None]:
def get_data():
    directory ='sources/'
    # 2013 - 2019
    for i in range(2013, 2020):
        files = None
        try:
            for filename in os.listdir(directory):
                if str(i) not in filename:
                    continue
                file = sc.textFile(os.path.join(directory, filename)) \
                    .map(lambda line: line.split(',')) \
                    .map(del_quotation_marks) \
                    .filter(filter_first_2013_2019)
                if files is None:
                    files = file
                else:
                    files = files.union(file)
            res = files.map(map_2013_2019).filter(lambda x: x != '')
            res.coalesce(1).saveAsTextFile(f'clean_sources/{i}')
        except:
            pass
    # 2020 - 2023
    for i in range(2020, 2024):
        files = None
        try:
            for filename in os.listdir(directory):
                if str(i) not in filename:
                    continue
                file = sc.textFile(os.path.join(directory, filename)) \
                    .map(lambda line: line.split(',')) \
                    .map(del_quotation_marks) \
                    .filter(filter_first_2020_2023)
                if files is None:
                    files = file
                else:
                    files = files.union(file)
            res = files.map(map_2020_2023).filter(lambda x: x != '')
            res.coalesce(1).saveAsTextFile(f'clean_sources/{i}')
        except:
            pass

Определяем цункцию, которая будет использоваться для получения координат и добавления ее к датасету

In [None]:
def bind_data():
    # собрать данные о станциях
    stations = sc.textFile("stations").flatMap(mapper_st) \
                                      .reduceByKey(lambda a, b: a) \
                                      .map(mapper_st_final)
    stations_key = stations.keyBy(lambda x: x["name"])
    for y in range(2013, 2019 + 1):
        try:
            df = sc.textFile(f"clean_sources/{y}").map(mapper_tr)
            df_from = df.keyBy(lambda x: x["start_name"])
            inter = df_from.leftOuterJoin(stations_key) \
                .map(mapper_inter)
            df_to = inter.keyBy(lambda x: x["end_name"])
            final = df_to.leftOuterJoin(stations_key) \
                .map(finalise) \
                .coalesce(1) \
                .saveAsTextFile(f"binded_data/{y}")
        except:
            pass

Собираем все полученные таблицы в одной папке, остальные удаляем

In [None]:
def unity():
    os.makedirs('data/src')
    for i in range(2013, 2020):
        os.rename(f'binded_data/{i}', f'data/src/{i}')
    for i in range(2020, 2024):
        os.rename(f'clean_sources/{i}', f'data/src/{i}')
    shutil.rmtree('binded_data')
    shutil.rmtree('clean_sources')

Запускаем обработку данных

In [None]:
conf = SparkConf().setAppName('test').setMaster('local')
sc = SparkContext(conf=conf)
get_data()
bind_data()
unity()

В итоге мы оставляем следующие колонки:
1. id - уникальный идентификатор поездки
2. biketype - тип велосипеда
3. starttime - время начала поездки
4. endtime - время окончания поездки
5. startstation - название стартовой станции
6. endstation- название конечной станции
7. startlat, startlng - координаты стартовой станции
8. endlat, endlng - координаты конечной станции
9. member_casual - тип пользователя
10. age - возраст пользователя
11. gender - пол пользователя
12. bikeid - уникальный идентификатор велосипеда

### Разведочный анализ

#### Цели

1. Определить основные направления исследований
2. Определить какая информация может отсутствовать в различные временные интервалы
3. Определить, какие временные интервалы подходят под те или иные направления

#### Достижение цели

Посмотрим, как выглядят данные за различные годы

In [None]:
for year in range(2013, 2024):
    df = pd.read_table(f'data/src/{year}/part-00000', index_col=False, header=None, names=['id', 'biketype', 'starttime', 'endtime', 'startstation', 'endstation', 'startlat', 'startlng', 'endlat', 'endlng', 'member_casual', 'gender', 'age', 'bikeid'], sep=',')
    print(year)
    print(df.head())
    print()

Заметим, что помимо очевидных пропусков в данных за различные периоды, можно заметить, что члены и нечлены помечаются по-разному.

Проверим это ниже

In [None]:
for year in range(2013, 2024):
    df = pd.read_table(f'data/src/{year}/part-00000', index_col=False, header=None, names=['id', 'biketype', 'starttime', 'endtime', 'startstation', 'endstation', 'startlat', 'startlng', 'endlat', 'endlng', 'member_casual', 'gender', 'age', 'bikeid'], sep=',')
    print(year)
    print(df['member_casual'].unique())
    print()

Члены могут быть отмечены в логах как "member" или как "Subscriber", а нечлены - как "Customer" или как "casual". Также один раз встречается "Depended", но он не представляет для нас никакого интереса в конечном счете

Определим, как менялось с годами количество станций

In [None]:
counts_stations = []
for year in range(2013, 2024):
    df = pd.read_table(f'data/src/{year}/part-00000', index_col=False, header=None, names=['id', 'biketype', 'starttime', 'endtime', 'startstation', 'endstation', 'startlat', 'startlng', 'endlat', 'endlng', 'member_casual', 'gender', 'age', 'bikeid'], sep=',')
    counts_stations.append(len(np.unique(np.concatenate((df['startstation'].dropna().unique(), df['endstation'].dropna().unique()), axis=None))))

In [None]:
plt.plot(range(2013, 2024), counts_stations)

Как мы видим по графику, количество станций растет. Падение в конце же обусловлено тем, что мы имеем за последний год только 3 месяца, вместо 12, из-за было собрано мало данных

Однако именно на информации за 2023 мы будем основываться в анализе станций, т.к. только с ними мы можем иметь высокую уверенность в том, что станции не были закрыты (к тому же по выборка достаточно большая, что делает ее репрезентативной)

Посмотрим на то, как могут строиться маршруты

In [None]:
import networkx as nx

In [None]:
df = pd.read_table(f'data/src/2023/part-00000', index_col=False, header=None, names=['id', 'biketype', 'starttime', 'endtime', 'startstation', 'endstation', 'startlat', 'startlng', 'endlat', 'endlng', 'member_casual', 'gender', 'age', 'bikeid'], sep=',')

In [None]:
stations = df[['startstation', 'endstation']]
stations.rename(columns={'startstation': 'source', 'endstation': 'target'})
graph = nx.Graph()
graph.add_edges_from(stations.to_numpy())

In [None]:
plt.figure(figsize=(15,15))
options = {"node_color": "black", "node_size": 25, "linewidths": 0, "width": 0.1}
pos = nx.spring_layout(graph, seed=1969)
nx.draw(graph, pos, **options)

Как мы видим, в логгах встречаются прогулочные сессии (т.е. человек начал ее и закончил в одном пункте), а также на гарфе мы видим висячие вершины, что говорит о том, что некоторые станции использовались для связи лишь с одним пунктом.

### Целевая аудитория

### Велосипеды

Определяем функции, которые необходимы для вычисления 

In [None]:
def get_shortest_distance(lat1, lon1, lat2, lon2):
    return acos(sin(lat1) * sin(lat2) + cos(lat1) * cos(lat2) * cos(lon2 - lon1)) * 6371


def parse_table1(line):
    id_, type_, startt, endt, startn, endn, startlt, startln, endlt, endln, member, gender, age, bikeid = line.split(",")
    return (bikeid, 1)


def parse_table2(line):
    id_, type_, startt, endt, startn, endn, startlt, startln, endlt, endln, member, gender, age, bikeid = line.split(",")
    V_AVG = 12/3.6/1000
    startt = datetime.datetime.strptime(startt, '%Y-%m-%dT%H:%M:%S')
    endt = datetime.datetime.strptime(endt, '%Y-%m-%dT%H:%M:%S')
    duration = endt - startt
    return (bikeid, V_AVG*duration.total_seconds())


def parse_table3(line):
    id_, type_, startt, endt, startn, endn, startlt, startln, endlt, endln, member, gender, age, bikeid = line.split(",")
    return (bikeid, startt[0:7])

In [None]:
conf = SparkConf().setAppName('test').setMaster('local')
sc = SparkContext(conf=conf)

In [None]:
REPAIR_LIM = 10000
REPAIR_PRICE = 60
eREPAIR_PRICE = 80
BIKE_PRICE = 1500
eBIKE_PRICE = 2000

In [None]:
unique_by_year = []
for y in range(2013, 2019 + 1):
    df = sc.textFile(f"data/src/{y}").map(parse_table1) \
                                          .reduceByKey(lambda a, b: a + b) \
                                          .map(lambda x: (None, 1)) \
                                          .reduceByKey(lambda a, b: a + b) \
                                          .collect()
    unique_by_year.append((y, df[0][1]))
print("Уникальные велосипеды по годам:", unique_by_year)

In [None]:
cum_repairs = []
run = []
big = sc.parallelize([])
for y in range(2013, 2019 + 1):
    df = sc.textFile(f"data/src/{y}").map(parse_table2) \
                                          .reduceByKey(lambda a, b: a + b)
    big = big.union(df)
    analyse_rep = big.map(lambda x: (None, floor(x[1] / REPAIR_LIM))) \
                 .reduceByKey(lambda a, b: a + b) \
                 .collect()
    cum_repairs.append((y, analyse_rep[0][1]))
    analyse_run = df.map(lambda x: (None, x[1])) \
                    .reduceByKey(lambda a, b: a + b) \
                    .collect()
    run.append((y, analyse_run[0][1]))
print("Накопительное количество ремонтов по годам: ", cum_repairs)
print("Пробег по годам: ", run)

In [None]:
cum = 0
repairs = []
for y, s in cum_repairs:
    repairs.append((y, s - cum))
    cum = s
print("Количество ремонтов по годам: ", repairs)
repair_prices = [(y, x * 0.6 * eREPAIR_PRICE + x * 0.4 * REPAIR_PRICE) for y, x in repairs]
print("Затраты на ремонт по годам: ", repair_prices)
print("Средние затраты по годам: ", sum([x for y, x in repair_prices]) / len(list(filter(lambda x: x[1] != 0, repair_prices))))

In [None]:
big = sc.parallelize([])
for y in range(2013, 2019 + 1):
    df = sc.textFile(f"data/src/{y}").map(parse_table3)
    big = big.union(df)

unique = big.reduceByKey(lambda a, b: a if a < b else b) \
            .map(lambda x: (x[1], 1)) \
            .reduceByKey(lambda a, b: a + b) \
            .collect()
unique_by_year2 = []
for y in range(2013, 2019 + 1):
    summa = 0
    for d, x in unique:
        if str(y) in d:
            summa += x
    unique_by_year2.append((y, summa))
print("Новые уникальные велосипеды по месяцам:", unique)
print("Новые уникальные велосипеды по годам:", unique_by_year2)
avg_bike = sum([x for y, x in unique_by_year2]) / len(unique_by_year2)
print("Среднее новых по годам: ", avg_bike)
print("Средние затраты на велосипеды: ", avg_bike * 0.6 * eBIKE_PRICE + avg_bike * 0.4 * BIKE_PRICE)

In [None]:
pdf = pd.DataFrame(unique_by_year)
plt.plot(pdf[0], pdf[1], c="green")
plt.title("Количество уникальных байков по годам")
plt.ylabel("Количество")
plt.xlabel("Год")

In [None]:
pdf = pd.DataFrame(repairs)
plt.plot(pdf[0], pdf[1], c="blue")
plt.title("Количество ремонтов байков по годам")
plt.ylabel("Количество")
plt.xlabel("Год")

In [None]:
pdf = pd.DataFrame(unique_by_year2)
plt.plot(pdf[0], pdf[1], c="red")
plt.title("Новые уникальные велосипеды по годам")
plt.ylabel("Количество")
plt.xlabel("Год")

In [None]:
pdf = pd.DataFrame(run)
plt.plot(pdf[0], pdf[1], c="green")
plt.title("Пробег по годам")
plt.ylabel("Путь (км)")
plt.xlabel("Год")

Какие выводы можно сделать из графиков за 2013-2019 года?
1. Велосипедная сеть росла, лишь в 2019 году наблюдается незначительный спад количества уникальных байков
2. В 2016 наблюдается небольшой спад пользования сети (подтверждение о чем можно найти в новостных лентах)
3. График ремонта байков будет цикличен. Divvy основан в 2013 году, в этом году введена большая часть велосипедов этой сети (вторая часть была введена в эксплуатацию после расширения сети в 2015 году). Значит, по истечении 2-х лет количество ремонтов байков достигнет пика, затем снова пойдет спад. В 2018 году идет наложение ремонта сразу двух "подгрупп" велосипедов (2013 + 2015) => Нужно вводить велосипеды в использование равномерно?

### Станции

In [None]:
stations = df[['id', 'startstation', 'biketype']].rename(columns={'startstation': 'station'}).groupby(by=['station']).agg({'id': 'count', 'biketype': pd.Series.mode}).reset_index()

In [None]:
stations.sort_values(by=['id'], ascending=False).rename(columns={"id":"count"})[:10]

In [None]:
stations.sort_values(by=['id'], ascending=False).rename(columns={"id":"count"})[:10]

### Экономика

## Заключение