In [1]:
from pyspark.sql import SparkSession, SQLContext
from creds import USERNAME as UNAME
from creds import PASSWORD as PASS
import datetime
import pandas

URL = "jdbc:mysql://localhost:3306/main_v2"

spark = SparkSession \
    .builder \
    .appName("Database access example") \
    .config('spark.driver.extraClassPath', './mysql-connector-java-8.0.16.jar') \
    .getOrCreate()
# TODO: research the .config options


sc = spark.sparkContext
sql_context = SQLContext(sc)
sc.setLogLevel("INFO")

def get_post_size(start, end):
    sql_query = "SELECT site.name, Length(post.body) AS LENGTH " + \
    "FROM site JOIN post ON site.siteId = post.siteId " + \
    "WHERE dateCreated BETWEEN \"" + \
    start + "\" AND \""+ end + "\""
    posts = sql_context.read.format("jdbc").options(
    url=URL,
    user=UNAME,
    password=PASS,
    query = sql_query, numPartitions=4).load()
    posts.show()

#get_post_size("2018-03-15","2018-03-16")

def get_avg_post_size(site, start, end):
    # join removed for efficiency, if added back, select site.name as well
    template_query = "select ttl " + \
    "FROM ( (select siteId, AVG(LEN) AS ttl " + \
    "FROM (select siteId, length(body) as LEN " + \
    "FROM post WHERE dateCreated BETWEEN \"{0}\" AND \"{1}\" AND siteId = {2}) AS L GROUP BY siteId ) AS T) " #+ \
    #"JOIN site ON site.siteId = T.siteId"
    sql_query = template_query.format(start, end, site)
    #print(sql_query)
    df = sql_context.read.format("jdbc").options(
    url=URL,
    user=UNAME,
    password=PASS,
    query = sql_query, numPartitions=4).load()
    return df
    #df.show()

def get_average_site_counts():
    summaryMap = {}
    site = sql_context.read.format("jdbc").options(
    url=URL,
    dbtable = "site",
    user=UNAME,
    password=PASS).load()
    for row in site.collect():
        siteId = row.siteId
        siteName = row.name
        # start a new entry
        summaryMap[siteName] = []
        # grab a smaller dataframe based on the siteId and year we are analyzing
        df = get_initial_time_slice(datetime.date(2018, 1, 1).strftime("%Y-%m-%d"), datetime.date(2019, 1, 1).strftime("%Y-%m-%d"), siteId)
        for m in range(1, 13):
            # cycle through the months in the year
            start_date = datetime.date(2018, m, 1)
            end_date = datetime.date(2018 if m + 1 < 13 else 2019, m + 1 if m + 1 < 13 else 1, 1)
            summaryMap[siteName].append(filter_by_date_range(df, start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")))
    map_summary(summaryMap)

def map_summary(summaryMap):
    print("\t\t\t\t JAN \t FEB \t MAR \t APR \t MAY \t JUN \t JUL \t AUG \t SEP \t OCT \t NOV \t DEC")
    for siteName in summaryMap.keys():
        print(siteName, end="\t")
        print(*summaryMap[siteName], sep = "\t ")
        print("")

def get_initial_time_slice(start_date, end_date, site_id):
    template_query = "select siteId, length(body) as LEN, dateCreated " + \
    "FROM post WHERE dateCreated BETWEEN \"{0}\" AND \"{1}\" AND siteId = {2}"
    sql_query = template_query.format(start_date, end_date, site_id)
    df = sql_context.read.format("jdbc").options(
    url=URL,
    user=UNAME,
    password=PASS,
    query = sql_query, numPartitions=4).load()
    return df
        
def filter_by_date_range(df, start_date, end_date):
    """
    filters out a given daterange from a dataframe and averages the length of post bodies
    :param: df the dataframe - should already be filtered by siteId (and possible a slightly larger date range for more efficient queries)
    :param: start_date - the start date to filter
    :param: end_date - the end date to filter
    """
    return round(df.filter(df.dateCreated.between(start_date, end_date)).groupBy().avg("LEN").withColumnRenamed('avg(LEN)','avg').collect()[0].avg)

get_average_site_counts()

TypeError: type NoneType doesn't define __round__ method