In [35]:
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as func

import numpy as np
import pandas as pd

from datetime import *
from dateutil.parser import parse

from bokeh.io import push_notebook, show, output_notebook
from bokeh.layouts import row
from bokeh.plotting import figure
from bokeh.charts import Bar, output_file, show
from bokeh.charts.attributes import cat
from bokeh.palettes import *
from bokeh.models import Range1d
output_notebook()

sql = SQLContext(sc)

In [36]:
data_path = "../input/csv/"
match_rdd = sc.textFile(data_path + "matches.csv")

# header = match_rdd.first()
# fields = [StructField(field_name, StringType(), False) for field_name in header.split(',')]
# fields[0].dataType = LongType()
# fields[3].dataType = DateType()
# fields[9].dataType = BooleanType()
# fields[11].dataType = LongType()
# fields[12].dataType = LongType()
# fields[17].nullable = True
# schema = StructType(fields)

match_header = match_rdd.filter(lambda l: "id,season" in l)
match_no_header = match_rdd.subtract(match_header)
match_temp_rdd = match_no_header.map(lambda k: k.split(',')).map(lambda p: (int(p[0]), p[1],p[2],parse(p[3]).date(),p[4],p[5],p[6],p[7],p[8],p[9]=='1',p[10],int(p[11]),int(p[12]),p[13],p[14],p[15],p[16],p[17]))
match_df = sql.createDataFrame(match_temp_rdd, match_rdd.first().split(','))
match_df = match_df.orderBy(match_df.id.asc())
match_df.show()

+---+------+----------+----------+--------------------+--------------------+--------------------+-------------+------+----------+--------------------+-----------+--------------+---------------+--------------------+-----------+--------------+--------------+
| id|season|      city|      date|               team1|               team2|         toss_winner|toss_decision|result|dl_applied|              winner|win_by_runs|win_by_wickets|player_of_match|               venue|    umpire1|       umpire2|       umpire3|
+---+------+----------+----------+--------------------+--------------------+--------------------+-------------+------+----------+--------------------+-----------+--------------+---------------+--------------------+-----------+--------------+--------------+
|  1|  2008| Bangalore|2008-04-18|Kolkata Knight Ri...|Royal Challengers...|Royal Challengers...|        field|normal|     false|Kolkata Knight Ri...|        140|             0|    BB McCullum|M Chinnaswamy Sta...|  Asad Rauf|   

In [37]:
def overall_rank_func(season_num):
    overall_ranking = match_df.filter(match_df.season == season_num).groupBy("winner").count().orderBy("count",ascending=0)
    overall_ranking = overall_ranking.filter("winner != '' ")
    overall_ranking = overall_ranking.selectExpr("winner as Teams", "count as Wins")
    overall_ranking.show(truncate=False)

    overall_pdf = overall_ranking.toPandas()
    N = overall_ranking.count()
    clr=brewer["Spectral"][N]

    p = Bar(overall_pdf, values="Wins", color="Teams",palette=clr, label=cat(columns="Teams", sort=False), xgrid=True, xlabel="Teams", ylabel="Wins", title="Overall Standings " + str(season_num), legend='top_right', plot_width=950, bar_width=0.6)
    t = show(p, notebook_handle=True)

overall_rank_func(2013)

+---------------------------+----+
|Teams                      |Wins|
+---------------------------+----+
|Mumbai Indians             |13  |
|Chennai Super Kings        |12  |
|Rajasthan Royals           |11  |
|Sunrisers Hyderabad        |10  |
|Royal Challengers Bangalore|9   |
|Kings XI Punjab            |8   |
|Kolkata Knight Riders      |6   |
|Pune Warriors              |4   |
|Delhi Daredevils           |3   |
+---------------------------+----+



In [38]:
def consistency_func():
    consistency_df = match_df.select("season","winner").groupBy("season","winner").count().orderBy("winner")
    consistency_df = consistency_df.filter("winner!='' ")

    b = match_df.groupBy("winner","season").count().orderBy("winner")
    b = b.groupBy("winner").count().filter("count > 1 and winner!='' ")
    b_list = [i.winner for i in b.collect()]

    consistency_df = consistency_df.where(func.col("winner").isin(b_list))
    consistency_df = consistency_df.groupBy("winner").agg(func.stddev_pop("count").alias("stddev"), func.sum("count").alias("total_wins")).orderBy("stddev","total_wins")
    consistency_df = consistency_df.withColumn("final_deviations", 100 - (consistency_df.stddev/consistency_df.total_wins)*100).orderBy("final_deviations", ascending=False)
    consistency_df = consistency_df.selectExpr("winner as Teams", "final_deviations as Consistency")
    consistency_df.show(truncate=False)

    consistency_pdf = consistency_df.toPandas()
    N = consistency_df.count()
    clr=brewer["RdYlGn"][N]

    p2 = Bar(consistency_pdf, values="Consistency", color="Teams", palette=clr, label=cat(columns="Teams", sort=False), xlabel="Teams", ylabel="Win Consistency %age", title="IPL Performance Consistencies", legend='top_right', plot_width=950, bar_width=0.6)
    p2.y_range = Range1d(90, 110)
    t2 = show(p2, notebook_handle=True)

consistency_func()

+---------------------------+-----------------+
|Teams                      |Consistency      |
+---------------------------+-----------------+
|Pune Warriors              |100.0            |
|Chennai Super Kings        |98.52414888400493|
|Royal Challengers Bangalore|97.32503181803925|
|Mumbai Indians             |97.02764797791212|
|Kolkata Knight Riders      |96.25963296004588|
|Rajasthan Royals           |96.07664287037993|
|Kings XI Punjab            |95.57322737283558|
|Delhi Daredevils           |94.89496812290929|
|Sunrisers Hyderabad        |93.93660937409167|
|Deccan Chargers            |91.16810450009262|
+---------------------------+-----------------+

