In [1]:
# Temp for local development
import ssl
import os
# os.environ['PYSPARK_PYTHON'] = '/Library/Frameworks/Python.framework/Versions/3.6/bin/python3.6'
ssl._create_default_https_context = ssl._create_unverified_context

In [None]:
import numpy as np
import pandas as pd
import random

from pyspark import SparkContext, SparkFiles, SQLContext
from pyspark.sql.types import StructType, StructField, StringType, BooleanType

from pyspark.sql import functions as F
from pyspark.sql import types as T

from itertools import chain
import matplotlib.pyplot as plt
from statsmodels.graphics.mosaicplot import mosaic
# https://github.com/shakedzy/dython
from dython.nominal import associations

In [None]:
sc = SparkContext.getOrCreate()
spark = SQLContext(sc)

In [None]:
import os
import json

if not os.path.exists('.data'):
    os.makedirs('.data')

def save_df(df, name):
    df.repartition(1).write.mode('overwrite').parquet(".data/" + name + ".parquet")

def save_dict(data, name):
    with open(".data/" + name + ".json", "w") as f:
      json.dump(data, f)

def load_dict(name):
    with open(".data/" + name + ".json") as f:
        out = json.load(f)

    return out

def load_df(name):
    return spark.read.parquet(".data/" + name + ".parquet")

In [None]:
def get_non_string_vars():
    global shared
    return shared['identifierVars'] + shared['continousVars'] + shared['intervalVars'] + shared['binaryVars']

def get_all_vars():
        return shared['nominalVars'] + get_non_string_vars();

In [None]:
# Create Todo list
def init_todo():
  if not os.path.exists('.data/todoList.parquet'):
      schema = StructType([
        StructField('todo', StringType(), True),
        StructField('finished', BooleanType(), True)
      ])
      # DATABRICKS shared.todoList = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
      todoList = spark.createDataFrame(sc.emptyRDD(), schema)
      save_df(todoList, 'todoList')

def add_todo(desc):
  todoList = load_df('todoList')
  found = todoList.filter(F.col("todo") == desc).count()
  if found == 0:
      newRow = spark.createDataFrame([(desc,False)])
      todoList = todoList.union(newRow)
      save_df(todoList, 'todoList')

def list_todo():
  return load_df('todoList').toPandas()

def finish_todo(desc):
  todoList = load_df('todoList')
  todoList = todoList.withColumn(
      "finished",
      F.when(
        F.col("todo") == desc,
         True
      ).otherwise(F.col("finished"))
  )
  save_df(todoList, 'todoList')


init_todo()


In [None]:
import os
from pyspark import SparkFiles

def import_by_url(url):
  # Given a url to a csv file, import and return a dataframe
  #
  sc.addFile(url)
  filename = os.path.basename(url)
  file = "file://" + SparkFiles.get(filename)
  return spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(file)


def filter_default(dfIn, f1, f2):
  # Given a dataframe and two date field names, returns the dataframe removing records
  # where the f1 or f2 columns equal a default date
  defaultDates = ["2999-01-01 00:00:00", "1900-01-01 00:00:00"]
  return dfIn.filter( ~F.col(f1).isin(defaultDates) & ~F.col(f2).isin(defaultDates) )


def date_stats(dfIn, f1, f2):
  # Given a dataframe and two date field names, returns a new dataframe with the difference between
  # the dates in minutes, hours and minutes
  dfOut = filter_default(dfIn, f1, f2)

  dfOut = dfOut.withColumn("minues", (F.col(f1).cast("long") - F.col(f2).cast("long"))/60.).select(f1, f2, "minues")

  dfOut = dfOut.withColumn("hours", (F.col(f1).cast("long") - F.col(f2).cast("long"))/3600.).select(f1, f2, "hours", "minues")

  return dfOut.withColumn("days", (F.col(f1).cast("long") - F.col(f2).cast("long"))/86400.).select("days", "hours", "minues")


def annotate_plot(ax):
  # Add total labels to plot
  for p in ax.patches:
      ax.annotate(
        round(p.get_height(), 2),
        (p.get_x()+p.get_width()/2., p.get_height()),
        ha='center',
        va='center',
        color='white',
        fontweight='bold',
        xytext=(0, -10),
        textcoords='offset points')



def single_val(df, groupCol):
    """ Give a dataframe, and a column to group by return a list of
        perfectly correlated variables
    """

    inCols = []

    pdDf = df.toPandas()
    for c in pdDf.columns:
        if pdDf[c].unique().size == 1:
           inCols.append(c)

    return inCols

def id_to_name(df, idVar, newVar, newIdList):
  # Given a dataframe, id variable, new variable name and list of new ids
  # add a new variable to the dataframe mapping the id to the array

  # Save org ids to a list
  oldIdList = [row[idVar] for row in df.select(idVar).distinct().orderBy(idVar).collect()]

    # Create map
  newIdMap = dict()
  # Add letters to map
  for i, val in enumerate(oldIdList):
      newIdMap[val] = newIdList[i]

  # Create mapping expression
  mapping_expr = F.create_map([F.lit(x) for x in chain(*newIdMap.items())])

  # Add org column with letter related to id
  return df.withColumn(newVar, mapping_expr[df[idVar]])
