### Problem definition

In [41]:
import logging
import math

import pyspark.sql.functions as funcs
from pyspark.sql.types import *

import pandas as pd

In [42]:
logger = logging.getLogger('entropy')

In [29]:
df = spark.read.csv('file:///tmp/data.csv', header='true', inferSchema='true')

In [30]:
def calc_location_entropy(df):
    """
    :param df: A Spark dataframe with 2 columns available: location and num_vists
    :return: The location entropy of the dataframe
    """
    assert df is not None, 'DataFrame should not be null'
    assert 'location' in df.columns, 'Location is not present in the dataframe'
    assert 'num_visits' in df.columns, 'Number of visits is not present in the dataframe'
    
    def single_entropy(visits, total_visits):
        p = 1. * visits / total_visits
        return p * math.log(p, 2)
    
    single_entropy_udf = funcs.udf(single_entropy, DoubleType())
    
    total_num_visits = df.select(funcs.sum('num_visits').alias('total_visits')).first()['total_visits']
    total_entropy = df.groupBy('location').agg(funcs.sum('num_visits').alias('num_visits')) \
        .withColumn('total_num_visits', funcs.lit(total_num_visits).cast(LongType())) \
        .withColumn('entropy', single_entropy_udf('num_visits', 'total_num_visits')) \
        .select(funcs.sum('entropy').alias('total_entropy')) \
        .first()['total_entropy']
        
    return -total_entropy

### ETL on input data:

* Filter valid # arrivals
* Type convertion for calculation
* Standadize column names

In [36]:
loc_arrivals_df = df.filter('value != "na"') \
    .withColumn('num_visits', df['value'].cast(IntegerType())) \
    .withColumnRenamed('country', 'location') \
    .cache()

### Global entropy

In [37]:
calc_location_entropy(loc_arrivals_df)

3.129235821660711

### Entropy analytics by year

In [38]:
loc_arrivals_df.select(funcs.min('month'), funcs.max('month')).first()

Row(min(month)=u'1961-01', max(month)=u'2017-09')

In [44]:
# this piece of calculation takes a bit long so re-run it only when necessary
# In Scala it is possible to make `calc_location_entropy` a AggregationFunction and fasten it a lot

entropy_by_year = {}
for year in range(1961, 2017 + 1):
    yearly_df = loc_arrivals_df.filter('month like "%d%%"' % year)
    entropy_by_year[year] = calc_location_entropy(yearly_df)
    print 'calculated entropy for year %d' % year
    
pd_df = pd.DataFrame(data=entropy_by_year)

calculated entropy for year 1961
calculated entropy for year 1962
calculated entropy for year 1963
calculated entropy for year 1964
calculated entropy for year 1965
calculated entropy for year 1966
calculated entropy for year 1967
calculated entropy for year 1968
calculated entropy for year 1969
calculated entropy for year 1970
calculated entropy for year 1971
calculated entropy for year 1972
calculated entropy for year 1973
calculated entropy for year 1974
calculated entropy for year 1975
calculated entropy for year 1976
calculated entropy for year 1977
calculated entropy for year 1978
calculated entropy for year 1979
calculated entropy for year 1980
calculated entropy for year 1981
calculated entropy for year 1982
calculated entropy for year 1983
calculated entropy for year 1984
calculated entropy for year 1985
calculated entropy for year 1986
calculated entropy for year 1987
calculated entropy for year 1988
calculated entropy for year 1989
calculated entropy for year 1990
calculated

ValueError: If using all scalar values, you must pass an index