# Финальный проект. Анализ данных такси.
## Неделя 1. Задание 1

### Импорт библиотек

In [1]:
import pandas as pd
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
import scipy.stats as sts
import os
import urllib.request
import sys
from datetime import datetime

np.set_printoptions(precision=5, floatmode='maxprec', suppress=True)
%matplotlib inline

### Оптимизация использования памяти
Гайд: https://www.dataquest.io/blog/pandas-big-data/

In [2]:
# Функция для конвертации в дату
def to_date (data, cols, frm = '%Y-%m-%d %H:%M:%S'):
    for i in cols:
        data[i] = pd.to_datetime(data[i], format=frm)
    return data

In [3]:
data = pd.read_csv('data/yellow_tripdata_2016-05.csv', nrows=1000)
# Выбираем данные для преобразования типов
data_float = data.select_dtypes(include=['float'])
data_int = data.select_dtypes(include=['int'])
# Преобразовываем типы
converted_int = data_int.apply(pd.to_numeric,downcast='unsigned')
converted_float = data_float.apply(pd.to_numeric,downcast='float')
# Заменяем столбцы в исходных данных
data[converted_int.columns] = converted_int
data[converted_float.columns] = converted_float
data.store_and_fwd_flag = data.store_and_fwd_flag.astype('category')
data = to_date(data, ['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

print('Использование памяти ({} строк): {}'.format(len(data), data.memory_usage(deep = True).sum()/1024**2))
# Создаем словарь стобцов данных и оптимальныз типов
dtypes = data.drop(['tpep_pickup_datetime', 'tpep_dropoff_datetime'], axis = 1).dtypes
dtypes_col = dtypes.index
dtypes_type = [i.name for i in dtypes.values]
column_types = dict(zip(dtypes_col, dtypes_type))
column_types.update({'VendorID':'category', 'payment_type':'category'})
column_types_2014 = column_types.copy()
column_types_2014.pop('improvement_surcharge')
#column_types_2014.update({np.nan:np.nan})
#print(len(column_types), column_types)
#print(len(column_types_2014), column_types_2014)

Использование памяти (1000 строк): 0.09277725219726562


'float32'

### Необходимые фукции

In [5]:
def load_columns_list (all_dates, raw_data_urls):
    #base_url = 'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_'
    #years = list(years)
    #months = list(months)
    #all_dates = [pd.to_datetime(str(y)+'.'+str(m)) for y in years for m in months]
    # Проверяем, есть ли файл с заголовками и есть ли в нем строки по нуными месяцам
    if not os.path.isfile('data_columns.csv'):
        wrong = 1
    else:
        indices = pd.to_datetime(pd.read_csv('data_columns.csv', usecols=[0], index_col=0).index)
        wrong = len([1 for date in all_dates if date not in indices])
    # Cчитаем (или загружаем) файл с заголовками
    if wrong > 0:
        print('Start generating new columns dataset')
        test = {str(y)+'.'+str(m):urllib.request.urlopen(url).readline().decode('utf-8').strip().split(',') 
                             for y,m,url,url_local in raw_data_urls}
        for i in test.keys():
            if   len(test[i]) != 19:    test[i].extend([np.nan]*(19-len(test[i])))
        ## Создаем датафрейм ссо всеми колонками в данных (протестирована для 2014-2016 годов)        
        data_columns = pd.DataFrame(test).transpose()
        data_columns.index = pd.to_datetime(data_columns.index)
        for i in data_columns:
            data_columns[i] = data_columns[i].str.strip()
        data_columns.to_csv('data_columns.csv')
        print('Columns are saved at local file "data_columns.csv"')
    else:
        data_columns = pd.read_csv('data_columns.csv', index_col=0)
        print('Columns are loaded from local file')
    try:
        up_to_2015_cols = list(data_columns.loc['2014-01-01'])[:-1]
        up_to_2015_cols[:3], up_to_2015_cols[7],up_to_2015_cols[-5]  = ['VendorID','tpep_pickup_datetime','tpep_dropoff_datetime'], 'RateCodeID', 'extra'
        return up_to_2015_cols
    except:
        print('Columns for dates before 2015 are not generated')
    
    

In [28]:
MIN_LAT, MAX_LAT, MIN_LON, MAX_LON = 40.49612, 40.91553, -74.25559, -73.70001
D_LAT = (MAX_LAT - MIN_LAT)/50.
D_LON = (MAX_LON - MIN_LON)/50.
def find_cell (df):
    cells = (50*np.floor((df.pickup_longitude-MIN_LON)/D_LON) + np.floor((df.pickup_latitude-MIN_LAT)/D_LAT) +1).astype(int)
    return cells

# Функция для нахождения ячейки, в которой находится точка
def find_point_cell(lon, lat):
    try: return(int(50*np.floor((lon-MIN_LON)/D_LON) + np.floor((lat-MIN_LAT)/D_LAT))+1)
    except IndexError: return ('Not in NYC')

In [24]:
# Функция для фильтрации датасета
def filter_data(df):
    df = df[(df.tpep_pickup_datetime != df.tpep_dropoff_datetime) &
                (df.passenger_count  != 0) &
                (df.trip_distance != 0) &
                (df.pickup_longitude >= MIN_LON) & (df.pickup_longitude <= MAX_LON) &
                (df.pickup_latitude >= MIN_LAT) & (df.pickup_latitude  <= MAX_LAT)]
    df.reset_index(drop=True, inplace =True)
    return df

In [21]:
def preprocess_data(df, start_date, n_hours, verbose = None):
    nrow_raw = len(df)
    df = filter_data(df)
    df.insert(column='hour',loc=len(df.columns),value=df.tpep_pickup_datetime.dt.floor('H'))
    df.insert(column='trip_duration',loc=len(df.columns),value=(df.tpep_dropoff_datetime - df.tpep_pickup_datetime).dt.seconds)
    df.insert(column='pickup_cell',loc=len(df.columns),value= pd.Series(find_cell(df),dtype='int16'))
    test = pd.DataFrame({'hour_num':range(n_hours), 'hour':pd.DatetimeIndex(start = start_date, freq = 'H', periods = n_hours)})
    df = df.merge(test, how='left', on = 'hour',)
    if verbose == 'short' or verbose == 'full':
        print('Было строк:{}. Осталось строк:{} ({:.2%})'.format(nrow_raw,len(df),(len(df) / nrow_raw)))
    return (df)


In [9]:
def calulate_pivot(df, start_date, n_hours, verbose = None):
    pivot = sts.binned_statistic_2d(x=df.pickup_cell, y=df.hour_num, values=None,statistic='count', 
                                range=[[1,2501], [0,n_hours]], bins=(2500, n_hours),
                                expand_binnumbers = False)
    ind = pd.DatetimeIndex(start = start_date, freq = 'H', periods = n_hours)
    pivot = pd.DataFrame(pivot[0], index=range(1, 2501), columns=ind)
    pivot = pivot.apply(pd.to_numeric, downcast = 'unsigned')
    
    # Проверка для тестирования
    if verbose == 'full':
        print('Размерность сводной таблицы:',pivot.shape)
        print('Сумма элементов сводной таблицы:', pivot.sum().sum())
        print('Кол-во строк в исходных данных:', len(df))
    return pivot

In [16]:
def process_urls(urls, verbose = 'short', nrows = None, return_data = False, return_pivot = False, save_to_disk = False):
    '''функция обрабатывает данные по списку ссылокБ созданных заранее. Она проверяет наличие файла на жестком диске,
    а затем фильтрует данные, рассчитывает новые столбцы и сводные таблицы (для работы необходимы функции filter_data, 
    preprocess_data и calculate_pivot).
    Затем обработанные файлы сохраняются обратно, а сводные таблицы сохраняются в отдельную папку
    Параметр verbose: принимает значения 'full', 'short', 'basic' - для полного вывода всех тестовых данных 
    подфункций, вывода промежуточных статусов обработки и вывода только основных этапов обработки для мониторинга процесса
    '''
    down_files = list(os.listdir('data')) # директория с данными (hardcoded)
    #up_to_2015_cols = load_columns_list
    for y, m, url, url_local in urls:        
        if url_local not in down_files: # переходим к следующему файлу, если текущий не загружен на жесткий диск
            print('!!! Data for {} is not downloaded\n'.format(str(m)+'.'+str(y)))
        elif y == 2016 and m>6:
            print('Data for {} has different structure and will not be analyzed'.format(str(m)+'.'+str(y)))
        else:
            n = datetime.now()
            print(n.strftime('%Y-%m-%d %H:%M:%S'))
            print(y, m)
            print('Start processing data for :', str(m)+'.'+str(y))
            start_date = pd.Timestamp(y, m, 1)
            n_hours = start_date.daysinmonth * 24
            if y < 2015:
                test = pd.read_csv('data/'+url_local, dtype=column_types_2014, names = up_to_2015_cols, nrows=nrows,
                                    header = 0, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'], 
                                    infer_datetime_format=True)  # для теста загружаются первые 10 строк
                #test.columns = up_to_2015_cols
                test.insert(18, 'improvement_surcharge', 0)
            else: 
                test = pd.read_csv('data/'+url_local, dtype=column_types, nrows=nrows,
                                    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
                                    infer_datetime_format=True,)  # для теста загружаются первые 10 строк
            if verbose == 'full': 
                print('Data was successfully read')
                print('Starting preprocessing data')
            # Расчет новых столбцов и сохранение данных
            test = preprocess_data(test, start_date, n_hours, verbose)
            
            
            # Расчет и сохранение сводных таблиц
            if verbose == 'full':
                print('Starting calculate pivot')
            pivot = calulate_pivot(test, start_date, n_hours, verbose)
            
            if save_to_disk:
                #test.to_csv('data/test/test_'+url_local,index=None)
                pivot.to_csv('data/pivots/pivot_'+url_local)
            
            print('Data was successfully processed')
            print('Время обработки файла:', str(datetime.now()- n),'\n')
            
            if return_data and len(urls) == 1 and return_pivot == False:
                return test
            if return_data == False and len(urls) == 1 and return_pivot:
                return pivot
            if return_data and len(urls)==1 and return_pivot:
                return test, pivot

### Подготовка для массового импорта данных

### Импорт данных

In [34]:
# Выбираем нужный период и генерируем список ссылок на файлы
regions = pd.read_csv('regions.csv', sep = ';')
min_date = '2014.06'
max_date = '2016.06'

raw_urls, periods = generate_urls(min_date, max_date)
up_to_2015_cols = load_columns_list(periods, raw_urls)

Columns are loaded from local file


In [35]:
raw_urls
#column_types_2014

[(2014,
  6,
  'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2014-06.csv',
  'yellow_tripdata_2014-06.csv'),
 (2014,
  7,
  'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2014-07.csv',
  'yellow_tripdata_2014-07.csv'),
 (2014,
  8,
  'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2014-08.csv',
  'yellow_tripdata_2014-08.csv'),
 (2014,
  9,
  'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2014-09.csv',
  'yellow_tripdata_2014-09.csv'),
 (2014,
  10,
  'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2014-10.csv',
  'yellow_tripdata_2014-10.csv'),
 (2014,
  11,
  'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2014-11.csv',
  'yellow_tripdata_2014-11.csv'),
 (2014,
  12,
  'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2014-12.csv',
  'yellow_tripdata_2014-12.csv'),
 (2015,
  1,
  'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2015-01.csv',
  'yellow_tripdata_2015-01.csv'),
 (2015,
  2,


In [37]:
#%%time
process_urls(urls=raw_urls, return_data=True, return_pivot=False, verbose='full', save_to_disk=True)#, nrows=100000)

2018-07-04 17:52:26
2014 6
Start processing data for : 6.2014
Data was successfully read
Starting preprocessing data
Было строк:13813029. Осталось строк:13374918 (96.83%)
Starting calculate pivot
Размерность сводной таблицы: (2500, 720)
Сумма элементов сводной таблицы: 13374918
Кол-во строк в исходных данных: 13374918
Data was successfully processed
Время обработки файла: 0:02:26.610997 

2018-07-04 17:54:52
2014 7
Start processing data for : 7.2014
Data was successfully read
Starting preprocessing data
Было строк:13106365. Осталось строк:12683644 (96.77%)
Starting calculate pivot
Размерность сводной таблицы: (2500, 744)
Сумма элементов сводной таблицы: 12683644
Кол-во строк в исходных данных: 12683644
Data was successfully processed
Время обработки файла: 0:02:15.319773 

2018-07-04 17:57:08
2014 8
Start processing data for : 8.2014
Data was successfully read
Starting preprocessing data
Было строк:12688877. Осталось строк:12280367 (96.78%)
Starting calculate pivot
Размерность сводной 

Data was successfully read
Starting preprocessing data
Было строк:12210952. Осталось строк:11968419 (98.01%)
Starting calculate pivot
Размерность сводной таблицы: (2500, 744)
Сумма элементов сводной таблицы: 11968419
Кол-во строк в исходных данных: 11968419
Data was successfully processed
Время обработки файла: 0:01:26.342155 

2018-07-04 18:33:53
2016 4
Start processing data for : 4.2016
Data was successfully read
Starting preprocessing data
Было строк:11934338. Осталось строк:11697034 (98.01%)
Starting calculate pivot
Размерность сводной таблицы: (2500, 720)
Сумма элементов сводной таблицы: 11697034
Кол-во строк в исходных данных: 11697034
Data was successfully processed
Время обработки файла: 0:01:29.908234 

2018-07-04 18:35:23
2016 5
Start processing data for : 5.2016
Data was successfully read
Starting preprocessing data
Было строк:11836853. Осталось строк:11626521 (98.22%)
Starting calculate pivot
Размерность сводной таблицы: (2500, 744)
Сумма элементов сводной таблицы: 11626521

### График количества поездок от ESB

In [38]:
# Empire State Building coordinates
esb_lat = 40.748412
esb_long = -73.985860
esb_cell = find_point_cell(esb_long, esb_lat)
print('Cell for Empire State Building:',esb_cell)

Cell for Empire State Building: 1231


In [39]:
pivot.sum().sum()

NameError: name 'pivot' is not defined

In [None]:
plt.grid()
plt.plot(range(744), pivot.loc[esb_cell,:])#, width = 1, align = 'edge', tick_label = range(24))

plt.title('Trips from ESB by hours')
plt.xlabel('$Hour$')
plt.ylabel('Trip   count')

### Подсчет количества пар ячейка-час, из которых не было поездок

In [None]:
(pivot==0).sum().sum()