# Del 14: Procesiranje velikih datasetov v pandas-u

## Introduction & Data

In [None]:
import pandas as pd
import numpy as np

In [None]:
moma = pd.read_csv('data/MoMAExhibitions1929to1989.csv')

In [None]:
moma.head(2)

In [None]:
moma.info()

## Measuring the memory usage of a Pandas DataFrame



### The Internal Representation of a Dataframe

<img src="./images/pandas_dataframe_blocks.png">

In [None]:
moma._data

### Dataframe Memory Footprint

#### Numbers (int, float) and other fixed-size objects

In [None]:
series = pd.Series([123] * 1_000_000, dtype=np.int64)

In [None]:
series.memory_usage()

In [None]:
series.memory_usage(deep=True)

#### Example: moma dataset - float clomun

In [None]:
34558*8

In [None]:
moma['ExhibitionID'].nbytes

#### Object Columns (arbitrarily-sized objects)

<img src="./images/numpy_vs_python.png">

In [None]:
total_bytes = moma.size*8

In [None]:
total_megabytes = total_bytes/(1024*1024)

In [None]:
print('Total bytes:', total_bytes)
print('Total megabytes:', total_megabytes)

In [None]:
moma.info(memory_usage='deep')

In [None]:
moma.memory_usage(deep=True)

> **index** : *bool, default True* --> 
Specifies whether to include the memory usage of the DataFrame’s index in returned Series. If index=True, the memory usage of the index is the first item in the output.

In [None]:
moma.memory_usage(deep=True, index=False)

In [None]:
moma["ExhibitionTitle"].memory_usage()/1024/1024

In [None]:
moma["ExhibitionTitle"].memory_usage(deep=True)/1024/1024

In [None]:
import sys
(sum([sys.getsizeof(s) for s in moma["ExhibitionTitle"]]) + moma["ExhibitionTitle"].memory_usage())/1024/1024

### Getting memory usage by type

In [None]:
def get_memory_usage_by_type(df, types=[]):
    if not types:
        types = []
        for column in df.columns:
            if hasattr(df[column], 'cat'):
                types.append('category')
            else:
                types.append(df[column].dtype)
        types = list(set(types))
    total = 0
    for dtype in types:
        selected_dtype = df.select_dtypes(include=[dtype])
        num_of_columns = len(selected_dtype.columns)
        mean_usage_b = selected_dtype.memory_usage(deep=True, index=False).mean()
        mean_usage_mb = mean_usage_b / 1024 ** 2
        sum_usage_b = selected_dtype.memory_usage(deep=True, index=False).sum()
        sum_usage_mb = sum_usage_b / 1024 ** 2
        print(f"Average memory usage: {round(mean_usage_mb, 3)} MB and total: {round(sum_usage_mb, 3)} MB for {num_of_columns}x {dtype} columns.")
        total += sum_usage_mb
        
    print('----------------------')
    print(f'Total memory usage: {round(total, 3)} MB')

In [None]:
get_memory_usage_by_type(moma)

## Optimizing Dataframe Memory Footprint

### Dropping columns

In [None]:
moma_sample = pd.read_csv("data/MoMAExhibitions1929to1989.csv", nrows=3)

In [None]:
moma_sample

In [None]:
# da dobimo imena vseh stolpcev
moma_sample.columns.tolist()

In [None]:
keep_cols = ['ExhibitionID', 'ExhibitionNumber', 'ExhibitionBeginDate', 
             'ExhibitionEndDate', 'DisplayName', 'Institution']

In [None]:
moma_small = pd.read_csv('data/MoMAExhibitions1929to1989.csv',
                    usecols=keep_cols)

In [None]:
get_memory_usage_by_type(moma_small)

### Optimizing Numeric Columns with Smaller Subtypes

<table>
<thead>
<tr>
<th>memory usage</th>
<th>float</th>
<th>int</th>
<th>uint</th>
<th>datetime</th>
<th>bool</th>
<th>object</th>
</tr>
</thead>
<tbody>
<tr>
<td>1 bytes</td>
<td></td>
<td>int8</td>
<td>uint8</td>
<td></td>
<td>bool</td>
<td></td>
</tr>
<tr>
<td>2 bytes</td>
<td>float16</td>
<td>int16</td>
<td>uint16</td>
<td></td>
<td></td>
<td></td>
</tr>
<tr>
<td>4 bytes</td>
<td>float32</td>
<td>int32</td>
<td>uint32</td>
<td></td>
<td></td>
<td></td>
</tr>
<tr>
<td>8 bytes</td>
<td>float64</td>
<td>int64</td>
<td>uint64</td>
<td>datetime64</td>
<td></td>
<td></td>
</tr>
<tr>
<td>variable</td>
<td></td>
<td></td>
<td></td>
<td></td>
<td></td>
<td>object</td>
</tr>
</tbody>
</table>

In [None]:
int_types = ["int8", "int16", "int32", "int64"]
for it in int_types:
    print(np.iinfo(it))

In [None]:
np.iinfo('int8').min

In [None]:
np.iinfo('int8').max

#### Integer Columns

In [None]:
# Error
#moma['ConstituentID'].astype('int')

In [None]:
moma.select_dtypes(include=['float']).isnull().sum()

In [None]:
moma = pd.read_csv("data/MoMAExhibitions1929to1989.csv")
moma.select_dtypes(include=['float']).head()

In [None]:
get_memory_usage_by_type(moma)

In [None]:
moma.dtypes.value_counts()

In [None]:
# Function to convert to best integer subtype
def convert_col_to_best_int_subtype(df, columns=[]):
    for column in columns:
        col_max = df[column].max()
        col_min = df[column].min()
        if col_max < np.iinfo('int8').max and col_min > np.iinfo('int8').min:
            print(f"Column {column} converted to int8.")
            df[column] = df[column].astype('int8')
        elif col_max <  np.iinfo("int16").max and col_min > np.iinfo("int16").min:
            print(f"Column {column} converted to int16.")
            df[column] = df[column].astype("int16")
        elif col_max <  np.iinfo("int32").max and col_min > np.iinfo("int32").min:
            print(f"Column {column} converted to int32.")
            df[column] = df[column].astype("int32")
        elif col_max <  np.iinfo("int64").max and col_min > np.iinfo("int64").min:
            print(f"Column {column} converted to int64.")
            df[column] = df[column].astype("int64")

In [None]:
convert_col_to_best_int_subtype(moma, ['ExhibitionSortOrder'])

In [None]:
moma.dtypes.value_counts()

In [None]:
get_memory_usage_by_type(moma)

#### Float Columns

In [None]:
len(moma) * 2 + 128

In [None]:
moma['ExhibitionSortOrder'].memory_usage(deep=True)

In [None]:
# Reset the dataframe to the original CSV
moma = pd.read_csv("data/MoMAExhibitions1929to1989.csv")

moma['ExhibitionSortOrder'] = moma['ExhibitionSortOrder'].astype('int')
moma['ExhibitionSortOrder'].dtype

In [None]:
# Reset the dataframe to the original CSV
moma = pd.read_csv("data/MoMAExhibitions1929to1989.csv")

moma['ExhibitionSortOrder'] = moma['ExhibitionSortOrder'].astype('int')
moma['ExhibitionSortOrder'] = pd.to_numeric(moma['ExhibitionSortOrder'], 
                                           downcast='integer')

moma['ExhibitionSortOrder'].dtype

In [None]:
# Reset the dataframe to the original CSV
moma = pd.read_csv("data/MoMAExhibitions1929to1989.csv")
moma['ExhibitionSortOrder'] = pd.to_numeric(moma['ExhibitionSortOrder'], 
                                            downcast='integer')

moma['ExhibitionSortOrder'].dtype

In [None]:
moma = pd.read_csv("data/MoMAExhibitions1929to1989.csv")
get_memory_usage_by_type(moma)
# convert int columns
convert_col_to_best_int_subtype(moma, ['ExhibitionSortOrder'])
# convert folat columns
float_cols = moma.select_dtypes(include=['float'])
for col in float_cols.columns:
    moma[col] = pd.to_numeric(moma[col], downcast='float')

In [None]:
moma.dtypes.value_counts()

In [None]:
get_memory_usage_by_type(moma)

### Converting To DateTime

In [None]:
moma.head(3)

In [None]:
moma['ExhibitionEndDate'].isnull().sum()

In [None]:
moma['ExhibitionEndDate'] = pd.to_datetime(moma['ExhibitionEndDate'])

In [None]:
# format parameter -> faster 
moma['ExhibitionBeginDate'] = pd.to_datetime(moma['ExhibitionBeginDate'], format='%m/%d/%Y')

In [None]:
moma.dtypes.value_counts()

In [None]:
get_memory_usage_by_type(moma)

### Converting to Categorical

In [None]:
moma['ConstituentType'].unique()

In [None]:
moma['ConstituentType'].value_counts()

In [None]:
moma['ConstituentType'].memory_usage(deep=True)

In [None]:
moma['ConstituentType'] = moma['ConstituentType'].astype('category')

In [None]:
get_memory_usage_by_type(moma)

In [None]:
moma['ConstituentType'].head()

In [None]:
obj_col = moma.select_dtypes(include=['object'])

In [None]:
for col in obj_col.columns:
    num_unique_values = len(moma[col].unique())
    num_total_values = len(moma[col])
    
    if num_unique_values / num_total_values < 0.5:
        moma[col] = moma[col].astype('category')

In [None]:
get_memory_usage_by_type(moma)

### Sparse columns

- https://pandas.pydata.org/pandas-docs/stable/user_guide/sparse.html
- https://pythonspeed.com/articles/pandas-load-less-data/

### Example:  Selecting Types While Reading the Data In

In [None]:
moma_sample = pd.read_csv("data/MoMAExhibitions1929to1989.csv", nrows=3)

In [None]:
moma_sample

In [None]:
# da dobimo imena vseh stolpcev
moma_sample.columns.tolist()

In [None]:
keep_cols = ['ExhibitionID', 'ExhibitionNumber', 'ExhibitionBeginDate', 
             'ExhibitionEndDate', 'ExhibitionSortOrder', 'ExhibitionRole', 
             'ConstituentType', 'DisplayName', 'Institution', 'Nationality', 
             'Gender']

In [None]:
col_types = {'ExhibitionID': np.float32, 
             'ExhibitionNumber': 'category',
             'ExhibitionSortOrder': np.float16, 
             'ExhibitionRole': 'category', 
             'ConstituentType' : 'category', 
             'DisplayName' : 'category', 
             'Institution': 'category',  
             'Nationality' : 'category', 
             'Gender': 'category'}

In [None]:
date_cols = ["ExhibitionBeginDate", "ExhibitionEndDate"]

In [None]:
moma = pd.read_csv('data/MoMAExhibitions1929to1989.csv',
                    usecols=keep_cols,
                    parse_dates=date_cols,
                    dtype=col_types)

In [None]:
get_memory_usage_by_type(moma)

## Processing Dataframes in Chunks

### Processing Chunks

<img src="./images/processing_chunks_overview.png">

In [None]:
import pandas as pd
import numpy as np

In [None]:
chunk_iter = pd.read_csv("data/MoMAExhibitions1929to1989.csv", chunksize=10000)

In [None]:
print(type(chunk_iter))

In [None]:
for chunk in chunk_iter:
    print(len(chunk))

In [None]:
# Create an iterator object that reads in 250-row chunks from "moma.csv".
chunk_iter = pd.read_csv("data/MoMAExhibitions1929to1989.csv", chunksize=250)

In [None]:
# For each chunk, retrieve the memory footprint in megabytes and append it to the list memory_footprints.
memory_footprints = []

for chunk in chunk_iter:
    memory_footprints.append(chunk.memory_usage(deep=True).sum()/(1024*1024))

In [None]:
# Generate and display a histogram of the values in memory_footprints using pyplot.hist()
import matplotlib.pyplot as plt

plt.hist(memory_footprints)
plt.show()

### Counting Across Chunks

In [None]:
# Create an iterator object that reads in 250-row chunks from "moma.csv".
chunk_iter = pd.read_csv("data/MoMAExhibitions1929to1989.csv", chunksize=250)

In [None]:
# For each chunk, retrieve the number of rows and add it to num_rows.
num_rows = 0

for chunk in chunk_iter:
    num_rows += len(chunk) 

In [None]:
num_rows

### Batch Processing

<img src="./images/process_chunks_count.png">

In [None]:
series_list = [pd.Series([1,2]), pd.Series([2,3])]

pd.concat(series_list)

In [None]:
lifespans = []

In [None]:
dtypes = { 'ConstituentBeginDate': 'float',
          'ConstituentEndDate': 'float'}

chunk_iter = pd.read_csv("data/MoMAExhibitions1929to1989.csv", 
                         chunksize=250,
                        dtype=dtypes)

In [None]:
for chunk in chunk_iter:
    diff = chunk['ConstituentEndDate'] - chunk['ConstituentBeginDate']
    lifespans.append(diff)

In [None]:
lifespans_dist = pd.concat(lifespans)

In [None]:
lifespans_dist.head()

### Optimizing Performance

In [None]:
%%timeit
lifespans = []

chunk_iter = pd.read_csv("data/MoMAExhibitions1929to1989.csv", chunksize=250, 
                         dtype={"ConstituentBeginDate": "float", "ConstituentEndDate": "float"})

for chunk in chunk_iter:
    lifespans.append(chunk['ConstituentEndDate'] - chunk['ConstituentBeginDate'])

lifespans_dist = pd.concat(lifespans)

In [None]:
%%timeit
lifespans = []

chunk_iter = pd.read_csv("data/MoMAExhibitions1929to1989.csv", chunksize=500, 
                         dtype={"ConstituentBeginDate": "float", "ConstituentEndDate": "float"},  
                         usecols=['ConstituentBeginDate', 'ConstituentEndDate'])

for chunk in chunk_iter:
    lifespans.append(chunk['ConstituentEndDate'] - chunk['ConstituentBeginDate'])
    
lifespans_dist = pd.concat(lifespans)

### Counting Unique Values

<img src="./images/processing_chunks_value_counts.png">

In [None]:
chunk_iter = pd.read_csv("data/MoMAExhibitions1929to1989.csv",
                         chunksize=250, usecols=['Gender'])

In [None]:
overall_vc = []

for chunk in chunk_iter:
    chunk_vc = chunk['Gender'].value_counts()
    overall_vc.append(chunk_vc)
    
combined_vc = pd.concat(overall_vc) 

In [None]:
combined_vc.head(10)

### Combining Chunks Using GroupBy

In [None]:
chunk_iter = pd.read_csv("data/MoMAExhibitions1929to1989.csv",
                         chunksize=250, usecols=['Gender'])

overall_vc = []

for chunk in chunk_iter:
    chunk_vc = chunk['Gender'].value_counts()
    overall_vc.append(chunk_vc)
    
combined_vc = pd.concat(overall_vc) 

In [None]:
final_vc = combined_vc.groupby(combined_vc.index).sum()

In [None]:
final_vc

## Analizing big files with Pandas and SQLite

In [None]:
import sqlite3

In [None]:
conn = sqlite3.connect('data/moma.db')

In [None]:
moma_iter = pd.read_csv('data/moma.csv', chunksize=1000)

In [None]:
for chunk in moma_iter:
    chunk.to_sql('exhibitions', conn, if_exists='append', index=False)

### Computing Primarily in SQL

In [None]:
conn = sqlite3.connect('data/moma.db')

In [None]:
q = '''SELECT exhibitionid, count(*) AS counts 
    from exhibitions 
    GROUP BY exhibitionid 
    ORDER BY counts desc;'''

In [None]:
eid_counts = pd.read_sql(q, conn)

In [None]:
conn.close()

In [None]:
eid_counts.head(10)

### Computing Primarily in Pandas

In [None]:
conn = sqlite3.connect('data/moma.db')

In [None]:
q = 'SELECT exhibitionid FROM exhibitions'

In [None]:
eid_counts = pd.read_sql(q, conn)

In [None]:
conn.close()

In [None]:
eid_counts.head()

In [None]:
eid_pandas_counts = eid_counts['ExhibitionID'].value_counts(dropna=False)

In [None]:
eid_pandas_counts.head(10)

### Reading in SQL Results Using Chunks

In [None]:
conn = sqlite3.connect('data/moma.db')

In [None]:
%%timeit
q = 'select exhibitionid from exhibitions;'
chunk_iter = pd.read_sql(q, conn, chunksize=100)

for chunk in chunk_iter:
    eid_pandas_counts = eid_counts['ExhibitionID'].value_counts()

In [None]:
%%timeit
q = 'select exhibitionid from exhibitions;'
chunk_iter = pd.read_sql(q, conn, chunksize=1000)

for chunk in chunk_iter:
    eid_pandas_counts = eid_counts['ExhibitionID'].value_counts()

In [None]:
%%timeit
q = 'select exhibitionid from exhibitions;'
chunk_iter = pd.read_sql(q, conn, chunksize=10000)

for chunk in chunk_iter:
    eid_pandas_counts = eid_counts['ExhibitionID'].value_counts()

## Vaja: Primer analize velikega dataseta

In [None]:
chunk_iter = pd.read_csv('data/crunchbase-investments.csv', 
                         chunksize=5000, 
                        encoding='ISO-8859-1')

In [None]:
mv_list = []

for chunk in chunk_iter:
    print(chunk.memory_usage(deep=True).sum() / (1024 * 1024))
    mv_list.append(chunk.isnull().sum())

In [None]:
combined_mv_vc = pd.concat(mv_list)

In [None]:
combined_mv_vc.head(5)

In [None]:
unique_combined_mv_vc = combined_mv_vc.groupby(combined_mv_vc.index).sum()

In [None]:
unique_combined_mv_vc.sort_values()

In [None]:
chunk_iter = pd.read_csv('data/crunchbase-investments.csv', 
                         chunksize=5000, 
                        encoding='ISO-8859-1')

In [None]:
counter = 0
series_memory_fp = pd.Series(dtype='float64')

In [None]:
for chunk in chunk_iter:
    if counter == 0:
        series_memory_fp = chunk.memory_usage(deep=True)
    else: 
        series_memory_fp += chunk.memory_usage(deep=True)
    counter += 1

In [None]:
# Drop memory footprint calculation for the index.
series_memory_fp = series_memory_fp.drop('Index')

In [None]:
series_memory_fp.sum() / (1024 * 1024)

In [None]:
chunk_iter = pd.read_csv('data/crunchbase-investments.csv', 
                         chunksize=5000, 
                        encoding='ISO-8859-1')
total_rows = 0
for chunk in chunk_iter:
    total_rows += len(chunk)

print(total_rows)

In [None]:
unique_combined_mv_vc.sort_values()/total_rows*100

In [None]:
# Drop columns representing URL's or containing way too many missing values (>90% missing)
drop_cols = ['investor_permalink', 'company_permalink', 
             'investor_category_code']

In [None]:
keep_cols = chunk.columns.drop(drop_cols)

In [None]:
keep_cols.tolist()

In [None]:
# Key: Column name, Value: List of types
col_types = {}
chunk_iter = pd.read_csv('data/crunchbase-investments.csv', 
                         chunksize=5000, 
                         encoding='ISO-8859-1', 
                         usecols=keep_cols)

for chunk in chunk_iter:
    for col in chunk.columns:
        if col not in col_types:
            col_types[col] = [str(chunk.dtypes[col])]
        else:
            col_types[col].append(str(chunk.dtypes[col]))

In [None]:
uniq_col_types = {}
for k,v in col_types.items():
    uniq_col_types[k] = set(col_types[k])
uniq_col_types

In [None]:
import sqlite3
conn = sqlite3.connect('data/crunchbase.db')
chunk_iter = pd.read_csv('data/crunchbase-investments.csv', 
                         chunksize=5000, 
                         encoding='ISO-8859-1',
                         usecols=keep_cols)

for chunk in chunk_iter:
    chunk.to_sql("investments", conn, if_exists='append', index=False)

In [None]:
# previmo z podatke
q = "SELECT * FROM investments LIMIT 5;"
data_5 = pd.read_sql(q, conn)

In [None]:
data_5

In [None]:
q = '''SELECT company_category_code, count(*) AS counts 
    from investments 
    GROUP BY company_category_code 
    ORDER BY counts desc;'''

In [None]:
data = pd.read_sql(q, conn)

In [None]:
data.head()

In [None]:
data.head(10).plot(kind='bar', 
          x='company_category_code', 
          y='counts', 
          legend=False)

plt.show()

## More file formats

- [Big Data file formats](https://luminousmen.com/post/big-data-file-formats)

### [Parquet](https://parquet.apache.org/)

Launched in 2013, Parquet was developed by Cloudera and Twitter to serve as a column-based storage format, optimized for work with multi-column datasets. Because data is stored by columns, it can be highly compressed (compression algorithms perform better on data with low information entropy which is usually contained in columns) and splittable. The developers of the format claim that this storage format is ideal for Big Data problems.

### [Avro](https://avro.apache.org/)

Apache Avro was released by the Hadoop working group in 2009. It is a row-based format that is highly splittable. It also described as a data serialization system similar to Java Serialization. The schema is stored in JSON format while the data is stored in binary format, minimizing file size and maximizing efficiency. Avro has robust support for schema evolution by managing added fields, missing fields, and fields that have changed. This allows old software to read the new data and new software to read the old data — a critical feature if your data has the potential to change.

### [Feather](https://github.com/wesm/feather)

Feather provides binary columnar serialization for data frames. It is designed to make reading and writing data frames efficient, and to make sharing data across data analysis languages easy. Feather uses the Apache Arrow columnar memory specification to represent binary data on disk. This makes read and write operations very fast. This is particularly important for encoding null/NA values and variable-length types like UTF8 strings.

Feather is a part of the broader Apache Arrow project. Feather defines its own simplified schemas and metadata for on-disk representation.

### [HDF5](https://portal.hdfgroup.org/display/knowledge/What+is+HDF5)

HDF5 is a unique technology suite that makes possible the management of extremely large and complex data collections.

The HDF5 technology suite is designed to organize, store, discover, access, analyze, share, and preserve diverse, complex data in continuously evolving heterogeneous computing and storage environments.

HDF5 supports all types of data stored digitally, regardless of origin or size. Petabytes of remote sensing data collected by satellites, terabytes of computational results from nuclear testing models, and megabytes of high-resolution MRI brain scans are stored in HDF5 files, together with metadata necessary for efficient data sharing, processing, visualization, and archiving.