In [1]:
from typing import List
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import types as T
from pyspark.sql import functions as F
from collections import Counter
from pyspark import SparkContext, RDD
from csv import reader
import itertools
import rdd_util
import importlib

In [2]:
from pyspark import SparkContext
spark = SparkSession \
    .builder \
    .appName("project") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
from typing import List
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import types as T
from pyspark.sql import functions as F
from collections import Counter
from pyspark import SparkContext, RDD
from csv import reader
import itertools

from dateutil import parser

def is_date(s: str, col: str):
    col_name = col.lower()
    if 'date' in col_name or 'year' in col_name \
            or 'time' in col_name or 'month' in col_name \
            or 'day' in col_name:
        return True
    else:
        return False
    
def mapd(x: List):
    """
    TODO: check date type
    :param x:
    :return:
    """
    # [col_idx, (value, type)]
    res = (x[0], [x[1], None])
    if (x[1] == ''):
        res[1][1] = 'empty'
    elif(is_date(x[1],x[0])):
        res[1][1] = 'date'
    elif (is_int(x[1])):
        res[1][1] = 'int'
        res[1][0] = int(res[1][0])
    elif (is_float(x[1])):
        res[1][1] = 'real'
        res[1][0] = float(res[1][0])
    else:
        res[1][1] = 'text'
    return res


def is_int(s: str):
    try:
        int(s)
        return True
    except ValueError:
        return False


def is_float(value: str):
    if '.' not in value:
        return False
    try:
        float(value)
        return True
    except ValueError:
        return False


def generate_meta(spark: SparkSession, path: str):
    # read dataframe
    sc = SparkContext.getOrCreate()
    # Add index to each row, [([...], 0),([...], 1)...]
    rdd = sc.textFile(path, 1).mapPartitions(lambda x: reader(x, delimiter='\t')).zipWithIndex()
    header = rdd.filter(lambda x: x[1] == 0) \
        .map(lambda x: (x[0])).collect()[0]  # extract the first part, ignore idx
    rows = rdd.filter(lambda x: x[1] != 0).map(lambda x: x[0])
    file_name = path.split('/')[-1]
    metadata = {
        'dataset_name': file_name,
        'key_column_candidates': header
    }
    N = len(header)
    # Transform to [(col_idx, value),(col_idx, value)...]
    items = rows.flatMap(
        lambda x, h=header: [(h[i], x[i]) for i in range(N)])

    # Transform to [(col_idx, (value, type)),(col_idx, (value, type))...]
    mapped_items = items.map(mapd)
    col_map = {}
    for col in header:
        col_map[col] = {}

    res2 = generate_distinct_top5(items)
    res1 = generate_null_empty(mapped_items)
    # [(col,non-empty, empty, total, distinct_num, top5:(col_name,freq))]
    flat_res = res1.join(res2).map(lambda x: (x[0], (*x[1][0], *x[1][1]))).collect()
    columns = []
    for res in flat_res:
        column_data = {
            'column_name': res[0],
            'number_non_empty_cells': res[1][0],
            'number_empty_cells': res[1][1],
            'number_distinct_values': res[1][3],
            'frequent_values': [x[0] for x in res[1][4]]
        }
        columns.append(column_data)
    metadata['columns'] =columns
    return metadata



def generate_null_empty(mapped_items: RDD) -> RDD:
    """
    :param mapped_items: [(col,(value, type)), ...]
    :return: [(col1,[non-empty, empty, total]), (col2,[null-empty, empty, total])]
    """

    def seqFunc(local, x):
        res = [i for i in local];
        if (x[1] != 'empty'):
            res[0] = local[0] + 1
        else:
            res[1] = local[1] + 1
        res[2] = local[2] + 1
        return res

    combFunc = (lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2]))
    count = mapped_items.aggregateByKey((0, 0, 0), seqFunc, combFunc)
    return count


def generate_distinct_top5(items: RDD) -> RDD:
    """
    :param items: [(col,value),...]
    :return: [(col,(distinct_num, [top5...])),(col,(distinct_num, [top5...])),...]
    """
    freq_items = items.map(lambda x: ((x[0], x[1]), 1)) \
        .aggregateByKey((0, 0),
                        (lambda x, y: (0, x[1] + 1)),
                        (lambda x, y: (x[1] + y[1]))) \
        .map(lambda x: ((x[0][0]), (x[0][1], x[1][1])))
    sorted_grouped_freq_items = freq_items.sortBy(lambda x: x[1][1], ascending=False).groupByKey()
    res = sorted_grouped_freq_items.mapValues(lambda x: (len(x), list(itertools.islice(x, 5))))
    return res



In [4]:
path = '/user/hm74/NYCOpenData/2bmr-jdsv.tsv.gz'

In [5]:
%%time
# read dataframe
sc = SparkContext.getOrCreate()
# Add index to each row, [([...], 0),([...], 1)...]
rdd = sc.textFile(path, 10).mapPartitions(lambda x: reader(x, delimiter='\t')).zipWithIndex()
header = rdd.filter(lambda x: x[1] == 0) \
    .map(lambda x: (x[0])).collect()[0]  # extract the first part, ignore idx
rows = rdd.filter(lambda x: x[1] != 0).map(lambda x: x[0])
file_name = path.split('/')[-1]
metadata = {
    'dataset_name': file_name,
    'key_column_candidates': header
}
N = len(header)
# Transform to [(col_idx, value),(col_idx, value)...]
items = rows.flatMap(
    lambda x, h=header: [(h[i], x[i]) for i in range(N)])

# Transform to [(col_idx, (value, type)),(col_idx, (value, type))...]
mapped_items = items.map(mapd).cache()

CPU times: user 19 ms, sys: 4 ms, total: 23 ms
Wall time: 2.97 s


In [6]:
mapped_items.take(10)

[('Base_Number', ['B02756', 'text']),
 ('Wave_Number', [3, 'int']),
 ('Base_Name', ['ALLY CAR SERVICE LLC', 'text']),
 ('DBA', ['ACTIVE EXPRESS CAR & LIMO 2', 'text']),
 ('years', ['2015', 'date']),
 ('Week_Number', [40, 'int']),
 ('Pickup_Start_Date', ['09/27/2015 12:00:00 AM', 'date']),
 ('Pickup_End_Date', ['10/03/2015 12:00:00 AM', 'date']),
 ('Total_Dispatched_Trips', [19, 'int']),
 ('Unique_Dispatched_Vehicle', [6, 'int'])]

In [7]:
# mapped_items.take(50)

In [8]:
res2 = generate_distinct_top5(items)
res1 = generate_null_empty(mapped_items)
%%time
res1.collect()

UsageError: Line magic function `%%time` not found.


In [9]:
mapped_items.take(20)

[('Base_Number', ['B02756', 'text']),
 ('Wave_Number', [3, 'int']),
 ('Base_Name', ['ALLY CAR SERVICE LLC', 'text']),
 ('DBA', ['ACTIVE EXPRESS CAR & LIMO 2', 'text']),
 ('years', ['2015', 'date']),
 ('Week_Number', [40, 'int']),
 ('Pickup_Start_Date', ['09/27/2015 12:00:00 AM', 'date']),
 ('Pickup_End_Date', ['10/03/2015 12:00:00 AM', 'date']),
 ('Total_Dispatched_Trips', [19, 'int']),
 ('Unique_Dispatched_Vehicle', [6, 'int']),
 ('Base_Number', ['B02756', 'text']),
 ('Wave_Number', [3, 'int']),
 ('Base_Name', ['ALLY CAR SERVICE LLC', 'text']),
 ('DBA', ['ACTIVE EXPRESS CAR & LIMO 2', 'text']),
 ('years', ['2015', 'date']),
 ('Week_Number', [41, 'int']),
 ('Pickup_Start_Date', ['10/04/2015 12:00:00 AM', 'date']),
 ('Pickup_End_Date', ['10/10/2015 12:00:00 AM', 'date']),
 ('Total_Dispatched_Trips', [58, 'int']),
 ('Unique_Dispatched_Vehicle', [8, 'int'])]

In [10]:
col_type_items = mapped_items.map(lambda x: ((x[0],x[1][1]),x[1][0]))

In [11]:
col_type_items.take(10)

[(('Base_Number', 'text'), 'B02756'),
 (('Wave_Number', 'int'), 3),
 (('Base_Name', 'text'), 'ALLY CAR SERVICE LLC'),
 (('DBA', 'text'), 'ACTIVE EXPRESS CAR & LIMO 2'),
 (('years', 'date'), '2015'),
 (('Week_Number', 'int'), 40),
 (('Pickup_Start_Date', 'date'), '09/27/2015 12:00:00 AM'),
 (('Pickup_End_Date', 'date'), '10/03/2015 12:00:00 AM'),
 (('Total_Dispatched_Trips', 'int'), 19),
 (('Unique_Dispatched_Vehicle', 'int'), 6)]

In [12]:
col_num_type_items = col_type_items.filter(lambda x: x[0][1]=='int'or x[0][1]=='real')

In [134]:
col_num_type_items.take(10)

[(('Wave_Number', 'int'), 3),
 (('Week_Number', 'int'), 40),
 (('Total_Dispatched_Trips', 'int'), 19),
 (('Unique_Dispatched_Vehicle', 'int'), 6),
 (('Wave_Number', 'int'), 3),
 (('Week_Number', 'int'), 41),
 (('Total_Dispatched_Trips', 'int'), 58),
 (('Unique_Dispatched_Vehicle', 'int'), 8),
 (('Wave_Number', 'int'), 3),
 (('Week_Number', 'int'), 42)]

In [135]:
def seqFunc(local, x):
    max_value = x if x>local[0] else local[0]
    min_value = x if x<local[1] else local[1]    
    return (max_value,min_value,local[2]+x, local[3]+1)

combFunc = (lambda x, y: (max(x[0],y[0]), min(x[1], y[1]), x[2] + y[2], x[3] + y[3]))
num_statistic = col_num_type_items.aggregateByKey((0,0,0,0),seqFunc, combFunc)
num_statistic = num_statistic.map(lambda x: (x[0],[*x[1],x[1][2]/x[1][3]]))
# [(('col_name', 'num_type'),(value, mean))...]
col_num_mean_items =col_num_type_items.join(num_statistic.map(lambda x: (x[0],x[1][4])))


In [136]:
%%time
num_statistic.collect()

CPU times: user 7 ms, sys: 1e+03 µs, total: 8 ms
Wall time: 1.23 s


[(('Wave_Number', 'int'), [4, 0, 272219, 101033, 2.6943572892025376]),
 (('Week_Number', 'int'), [53, 0, 2628258, 101033, 26.013856858650144]),
 (('Total_Dispatched_Trips', 'int'),
  [909056, 0, 549722961, 101033, 5441.0238337968785]),
 (('Unique_Dispatched_Vehicle', 'int'),
  [33578, 0, 16209232, 101033, 160.43502617956509])]

In [148]:
col_num_mean_items.take(10)

[(('Unique_Dispatched_Vehicle', 'int'), (6, 160.43502617956509)),
 (('Unique_Dispatched_Vehicle', 'int'), (8, 160.43502617956509)),
 (('Unique_Dispatched_Vehicle', 'int'), (8, 160.43502617956509)),
 (('Unique_Dispatched_Vehicle', 'int'), (7, 160.43502617956509)),
 (('Unique_Dispatched_Vehicle', 'int'), (10, 160.43502617956509)),
 (('Unique_Dispatched_Vehicle', 'int'), (3, 160.43502617956509)),
 (('Unique_Dispatched_Vehicle', 'int'), (3, 160.43502617956509)),
 (('Unique_Dispatched_Vehicle', 'int'), (3, 160.43502617956509)),
 (('Unique_Dispatched_Vehicle', 'int'), (3, 160.43502617956509)),
 (('Unique_Dispatched_Vehicle', 'int'), (3, 160.43502617956509))]

In [138]:
import math


result_dev= col_num_mean_items.aggregateByKey((0,), lambda local, x:(local[0]+(x[0]-x[1])**2,) , (lambda x, y: (x[0] + y[0])))
result_std = result_dev.map(lambda x:(x[0],math.sqrt(x[1][0])))


In [139]:
result_dev.collect()

[(('Unique_Dispatched_Vehicle', 'int'), (113773130619.74493,)),
 (('Wave_Number', 'int'), (77431.75309055187,)),
 (('Week_Number', 'int'), (22346320.600396164,)),
 (('Total_Dispatched_Trips', 'int'), (93374491035552.31,))]

In [140]:
result_std.collect()

[(('Unique_Dispatched_Vehicle', 'int'), 337302.72844989697),
 (('Wave_Number', 'int'), 278.26561607671164),
 (('Week_Number', 'int'), 4727.1895033303),
 (('Total_Dispatched_Trips', 'int'), 9663047.709473047)]

In [141]:
num_statistic.join(result_std).collect()

[(('Wave_Number', 'int'),
  ([4, 0, 272219, 101033, 2.6943572892025376], 278.26561607671164)),
 (('Unique_Dispatched_Vehicle', 'int'),
  ([33578, 0, 16209232, 101033, 160.43502617956509], 337302.72844989697)),
 (('Week_Number', 'int'),
  ([53, 0, 2628258, 101033, 26.013856858650144], 4727.1895033303)),
 (('Total_Dispatched_Trips', 'int'),
  ([909056, 0, 549722961, 101033, 5441.0238337968785], 9663047.709473047))]

In [143]:
num_statistic.join(result_std).map(lambda x: [x[0],[*x[1][0],x[1][1]]]).collect()

[[('Wave_Number', 'int'),
  [4, 0, 272219, 101033, 2.6943572892025376, 278.26561607671164]],
 [('Unique_Dispatched_Vehicle', 'int'),
  [33578, 0, 16209232, 101033, 160.43502617956509, 337302.72844989697]],
 [('Week_Number', 'int'),
  [53, 0, 2628258, 101033, 26.013856858650144, 4727.1895033303]],
 [('Total_Dispatched_Trips', 'int'),
  [909056, 0, 549722961, 101033, 5441.0238337968785, 9663047.709473047]]]

In [149]:
def generate_num_statistic(col_num_type_items: RDD) -> RDD:
    """
    :param col_num_type_items: [(('Wave_Number', 'int'), 3),(('Week_Number', 'int'), 40)...]
    :return: ['Wave_Number', 'int'], [max_value, min_value, sum, count, mean, std])
    """

    def seqFunc(local, x):
        max_value = x if x > local[0] else local[0]
        min_value = x if x < local[1] else local[1]
        return (max_value, min_value, local[2] + x, local[3] + 1)

    combFunc = (lambda x, y: (max(x[0], y[0]), min(x[1], y[1]), x[2] + y[2], x[3] + y[3]))
    num_statistic = col_num_type_items.aggregateByKey((0, 0, 0, 0), seqFunc, combFunc)
    num_statistic = num_statistic.map(lambda x: (x[0], [*x[1], x[1][2] / x[1][3]]))
    # [(('col_name', 'num_type'),(value, mean))...]
    col_num_mean_items = col_num_type_items.join(num_statistic.map(lambda x: (x[0], x[1][4])))
    result_dev = col_num_mean_items.aggregateByKey((0,), lambda local, x: (local[0] + (x[0] - x[1]) ** 2,), (lambda x, y: (x[0] + y[0])))
    result_std = result_dev.map(lambda x: (x[0], math.sqrt(x[1][0])))
    return num_statistic.join(result_std).map(lambda x: [x[0],[*x[1][0],x[1][1]]])

In [150]:
%%time
res= generate_num_statistic(col_num_type_items)
res.collect()

CPU times: user 40 ms, sys: 8 ms, total: 48 ms
Wall time: 3.38 s


In [152]:
res.collect()

[[('Wave_Number', 'int'),
  [4, 0, 272219, 101033, 2.6943572892025376, 278.26561607671164]],
 [('Unique_Dispatched_Vehicle', 'int'),
  [33578, 0, 16209232, 101033, 160.43502617956509, 337302.72844989697]],
 [('Week_Number', 'int'),
  [53, 0, 2628258, 101033, 26.013856858650144, 4727.1895033303]],
 [('Total_Dispatched_Trips', 'int'),
  [909056, 0, 549722961, 101033, 5441.0238337968785, 9663047.709473047]]]