## Add dependencies

In [1]:
from pyspark.sql import SparkSession
import math
from datetime import datetime
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import udf

### Spark session & context

In [3]:
# spark = SparkSession.\
#         builder.\
#         appName("clustered-science").\
#         master("spark://spark-master:7077").\
#         config("spark.executor.memory", "1g").\
#         getOrCreate()

In [2]:
spark = SparkSession.builder.appName("live-earth-science").master("local").getOrCreate()
sc = spark.sparkContext

22/11/16 17:57:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Create data frame

In [3]:
import wget

url = "https://raw.githubusercontent.com/RealAndySilver/sparky/0e433680f7577e15c15641001b9a3114137b85e5/data/underdrain.csv"
wget.download(url)

'underdrain.csv'

In [None]:
# data_df = spark.read.csv('./underdrain.csv',
#                          header=True,
#                          inferSchema=True)

In [4]:
data_df = spark.read.csv('underdrain.csv',
                         header=True,
                         inferSchema=True)
data_df.printSchema()
filtered_df = data_df.filter(data_df.DCH_INSTALL_DATE.isNotNull())

                                                                                

root
 |-- OBJECTID: integer (nullable = true)
 |-- DCH_FEA_KEY: integer (nullable = true)
 |-- DCH_GRPH_KEY: integer (nullable = true)
 |-- DCH_FEATYPE_TEXT: string (nullable = true)
 |-- DCH_OWNER_NAME: string (nullable = true)
 |-- DCH_PRBL_FLOW_TYPE: string (nullable = true)
 |-- DCH_MATERIAL_TYPE: string (nullable = true)
 |-- DCH_PIPE_SHP_TEXT: string (nullable = true)
 |-- DCH_LIFECYCLE_STAT: string (nullable = true)
 |-- DCH_DSTNTN_TYPE: string (nullable = true)
 |-- DCH_INSPECT_FLAG: string (nullable = true)
 |-- DCH_CASING_FLAG: string (nullable = true)
 |-- DCH_PERF_PIPE_FLAG: string (nullable = true)
 |-- DCH_LENGTH_FT_NBR: double (nullable = true)
 |-- DCH_WIDTH_IN_NBR: integer (nullable = true)
 |-- DCH_HEIGHT_IN_NBR: integer (nullable = true)
 |-- DCH_UPS_ELEV_FT_NBR: double (nullable = true)
 |-- DCH_DNS_ELEV_FT_NBR: double (nullable = true)
 |-- DCH_INSTALL_DATE: string (nullable = true)
 |-- DCH_LST_UPDT_DATE: string (nullable = true)
 |-- DCH_STREAM_NAME: string (null

In [5]:
filtered_rdd = filtered_df.rdd
filtered_rdd.take(10)

[Row(OBJECTID=43126, DCH_FEA_KEY=4900872, DCH_GRPH_KEY=None, DCH_FEATYPE_TEXT='Under Drain', DCH_OWNER_NAME='Seattle Public Utilities', DCH_PRBL_FLOW_TYPE='Sanitary', DCH_MATERIAL_TYPE='Ductile Iron Pipe', DCH_PIPE_SHP_TEXT='Circular', DCH_LIFECYCLE_STAT='Connected', DCH_DSTNTN_TYPE=None, DCH_INSPECT_FLAG='Y', DCH_CASING_FLAG='N', DCH_PERF_PIPE_FLAG='Y', DCH_LENGTH_FT_NBR=59.34, DCH_WIDTH_IN_NBR=8, DCH_HEIGHT_IN_NBR=8, DCH_UPS_ELEV_FT_NBR=None, DCH_DNS_ELEV_FT_NBR=None, DCH_INSTALL_DATE='2003/10/08 13:24:57+00', DCH_LST_UPDT_DATE='2013/02/01 00:00:00+00', DCH_STREAM_NAME=None, DCH_GSIP_NAME=None, SHAPE_Length=59.3412744546279),
 Row(OBJECTID=43128, DCH_FEA_KEY=4900968, DCH_GRPH_KEY=None, DCH_FEATYPE_TEXT='Under Drain', DCH_OWNER_NAME='Seattle Public Utilities', DCH_PRBL_FLOW_TYPE='Drainage', DCH_MATERIAL_TYPE='Ductile Iron Pipe', DCH_PIPE_SHP_TEXT='Circular', DCH_LIFECYCLE_STAT='Connected', DCH_DSTNTN_TYPE=None, DCH_INSPECT_FLAG='Y', DCH_CASING_FLAG='N', DCH_PERF_PIPE_FLAG='Y', DCH_LEN

### Logic Code

In [6]:
def get_timestamp(date_text):
    """Get timestamp from date text"""
    try:
        date_text = datetime.strptime(date_text, '%Y/%m/%d %H:%M:%S+00')
    except ValueError:
        date_text = datetime.strptime(
            "1900/01/01 01:01:01+00", '%Y/%m/%d %H:%M:%S+00')

    return date_text.timestamp()

In [7]:
def snap_time_to_resolution(timestamp, resolution=1):
    """Snap time to resolution"""
    if resolution <= 0:
        resolution = 1
    resolution_ms = resolution * 60
    snapped_time = datetime.fromtimestamp(
        math.floor(timestamp / resolution_ms) * resolution_ms)

    return snapped_time

# Create UDF

*There are 2 ways of registering a UDF*

### Way #1: 

*add annotation **@udf()** on top of the function*

In [11]:
@udf(returnType=TimestampType())
def snap_row(date, resolution=15):
    """Snap row to resolution"""
    timestamp = get_timestamp(date)
    snapped_time = snap_time_to_resolution(timestamp, resolution)
    return snapped_time


In [12]:
filtered_df.withColumn('SNAPPED_TIME', snap_row(filtered_df.DCH_INSTALL_DATE)).write.mode("overwrite").csv("data/csvs", header=True)

                                                                                

### Way #2: 


*use the **udf()** function and wrap the function to register and a type as 2nd arg*

In [None]:
# def snap_row(date, resolution=15):
#     """Snap row to resolution"""
#     timestamp = get_timestamp(date)
#     snapped_time = snap_time_to_resolution(timestamp, resolution)
#     return snapped_time

# snap_udf = udf(snap_row, 'timestamp')
# filtered_df.withColumn('SNAPPED_TIME', snap_udf(filtered_df.DCH_INSTALL_DATE)).write.mode("overwrite").csv("data/csvs", header=True)

In [None]:
spark.sql("SHOW USER FUNCTIONS").collect()