In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("review_and_category_analytics") \
    .config("spark.executor.memory", '8g') \
    .config('spark.executor.cores', '4') \
    .config('spark.cores.max', '4') \
    .config("spark.driver.memory",'8g') \
    .getOrCreate()

sc = spark.sparkContext

## NAMES: Katherine Brickley (kgb3mf), Jack Peele, Will McDevitt

In [4]:
data = spark.read.csv('/project/ds5559/group14_tennis/tennis_atp/atp_matches_2012.csv', header = True)
data_path = '/project/ds5559/group14_tennis/tennis_atp/'

In [None]:
import os

# assign path
path, dirs, files = next(os.walk(data_path))
file_count = len(files)
# create empty list
dataframes_list = []
 
# append datasets to the list
for i in range(file_count):
    if '.csv' in files[i]:
        temp_df =  spark.read.csv(data_path+files[i])
        dataframes_list.append(temp_df)
     
# display datasets
'''for dataset in dataframes_list:
    dataset.show(5)
    print('\n')
'''

In [4]:
dataframes_list[5].show(5)

+--------------------+-----------------+-------+---------+-------------+------------+---------+---------+-----------+------------+-------------------+-----------+---------+----------+-------------+--------+----------+-----------+--------------------+----------+--------+---------+-------------+-----------+-------+-----+-------+-----+----+------+-------+--------+--------+-------+---------+---------+-----+----+------+-------+--------+--------+-------+---------+---------+-----------+------------------+----------+-----------------+
|                 _c0|              _c1|    _c2|      _c3|          _c4|         _c5|      _c6|      _c7|        _c8|         _c9|               _c10|       _c11|     _c12|      _c13|         _c14|    _c15|      _c16|       _c17|                _c18|      _c19|    _c20|     _c21|         _c22|       _c23|   _c24| _c25|   _c26| _c27|_c28|  _c29|   _c30|    _c31|    _c32|   _c33|     _c34|     _c35| _c36|_c37|  _c38|   _c39|    _c40|    _c41|   _c42|     _c43|     

In [5]:
!pwd

/sfs/qumulo/qhome/kgb3mf/repos


In [None]:
first_dataset = dataframes_list[0]
first_dataset.show(5)

In [None]:
data.take(1)

In [None]:
# CHange this to full dataset later
print("Number of columns:", len(data.columns))

In [None]:
# Make a column called numNull then filter out if over 6
data = data.withColumn("numNull", sum(data[col].isNull().cast('int') for col in data.columns))

data = data.filter(data.numNull < 6)
data.count()

In [None]:
data.show()

In [None]:

winners = [col for col in data.columns if col.startswith('w_')]
winners


In [None]:
losers = [col for col in data.columns if col.startswith('l_')]
losers

In [None]:

final_df = data.select(winners)
new_columns = [col.strip('w_') for col in winners]

for i in range(0, len(final_df.columns)):
    final_df = final_df.withColumnRenamed(final_df.columns[i], new_columns[i])


In [None]:
from pyspark.sql.functions import lit

final_df = final_df.withColumn('result', lit(1))
final_df.show(5)

In [None]:
final_df2 = data.select(losers)

for i in range(0, len(final_df2.columns)):
    final_df2 = final_df2.withColumnRenamed(final_df2.columns[i], new_columns[i])
    
final_df2 = final_df2.withColumn('result', lit(0))
final_df2.show(5)


In [None]:
final = final_df.union(final_df2)

final.show(5)

##### Change names of some columns

In [None]:
final = final.withColumnRenamed('1stWon', 'firstWon')
final = final.withColumnRenamed('2ndWon', 'secondWon')
final = final.withColumnRenamed('1stIn', 'firstIn')


### Number of records

In [18]:
print('Number of records: ', final.count())

Number of records:  5352


### Number of columns

In [19]:
print('Number of columns: ', len(final.columns))

Number of columns:  10


### Statistical summary of response variable

In [20]:
print(final.toPandas().result.value_counts())

final.toPandas().describe()

# This is a binary response variable, so the only thing that really matters is value counts

1    2676
0    2676
Name: result, dtype: int64


Unnamed: 0,result
count,5352.0
mean,0.5
std,0.500047
min,0.0
25%,0.0
50%,0.5
75%,1.0
max,1.0


In [21]:
# final.groupBy('result').count().orderBy('result')

### Statistical summary of potential predictor variables (if there are a large number of predictors, select the top 10)


In [22]:
final.toPandas().describe()

Unnamed: 0,result
count,5352.0
mean,0.5
std,0.500047
min,0.0
25%,0.0
50%,0.5
75%,1.0
max,1.0


In [23]:
final.toPandas().describe(include = "all")

Unnamed: 0,ace,df,svpt,firstIn,firstWon,secondWon,SvGms,bpSaved,bpFaced,result
count,5352.0,5352.0,5352.0,5352.0,5352.0,5352.0,5352.0,5352.0,5352.0,5352.0
unique,38.0,17.0,196.0,136.0,100.0,54.0,36.0,21.0,29.0,
top,3.0,2.0,54.0,35.0,29.0,12.0,10.0,3.0,6.0,
freq,583.0,1089.0,102.0,144.0,214.0,358.0,782.0,773.0,505.0,
mean,,,,,,,,,,0.5
std,,,,,,,,,,0.500047
min,,,,,,,,,,0.0
25%,,,,,,,,,,0.0
50%,,,,,,,,,,0.5
75%,,,,,,,,,,1.0


In [24]:
final = final.withColumn("ace",final.ace.cast('double'))
final = final.withColumn("df",final.df.cast('double'))
final = final.withColumn("svpt",final.svpt.cast('double'))
final = final.withColumn("firstIn",final.firstIn.cast('double'))
final = final.withColumn("firstWon",final.firstWon.cast('double'))
final = final.withColumn("secondWon",final.secondWon.cast('double'))
final = final.withColumn("SvGms",final.SvGms.cast('double'))
final = final.withColumn("bpSaved",final.bpSaved.cast('double'))
final = final.withColumn("bpFaced",final.bpFaced.cast('double'))


### Logistic Regression

In [25]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

In [40]:
seed = 314
train_test = [0.8, 0.2]
train_data, test_data = final.randomSplit(train_test, seed)

In [41]:
assembler = VectorAssembler(inputCols=['ace','df','svpt','firstIn','firstWon','secondWon','SvGms','bpSaved','bpFaced'],
                            outputCol="features")

transformed = assembler.transform(train_data)
transformed.show(1)

+---+---+----+-------+--------+---------+-----+-------+-------+------+--------------------+
|ace| df|svpt|firstIn|firstWon|secondWon|SvGms|bpSaved|bpFaced|result|            features|
+---+---+----+-------+--------+---------+-----+-------+-------+------+--------------------+
|0.0|0.0| 8.0|    5.0|     5.0|      3.0|  2.0|    0.0|    0.0|     1|[0.0,0.0,8.0,5.0,...|
+---+---+----+-------+--------+---------+-----+-------+-------+------+--------------------+
only showing top 1 row



In [42]:
scaler = StandardScaler(inputCol="features", outputCol = "scaled")
scalerModel = scaler.fit(transformed)


df2 = scalerModel.transform(transformed)
#df2.groupBy('result').count().show()
df2.show(10)

+---+---+----+-------+--------+---------+-----+-------+-------+------+--------------------+--------------------+
|ace| df|svpt|firstIn|firstWon|secondWon|SvGms|bpSaved|bpFaced|result|            features|              scaled|
+---+---+----+-------+--------+---------+-----+-------+-------+------+--------------------+--------------------+
|0.0|0.0| 8.0|    5.0|     5.0|      3.0|  2.0|    0.0|    0.0|     1|[0.0,0.0,8.0,5.0,...|[0.0,0.0,0.268092...|
|0.0|0.0| 8.0|    6.0|     6.0|      2.0|  2.0|    0.0|    0.0|     1|[0.0,0.0,8.0,6.0,...|[0.0,0.0,0.268092...|
|0.0|0.0|10.0|    5.0|     5.0|      3.0|  2.0|    0.0|    0.0|     1|[0.0,0.0,10.0,5.0...|[0.0,0.0,0.335116...|
|0.0|0.0|31.0|   22.0|    17.0|      5.0|  5.0|    3.0|    3.0|     1|[0.0,0.0,31.0,22....|[0.0,0.0,1.038860...|
|0.0|0.0|40.0|   28.0|    22.0|      8.0|  7.0|    0.0|    0.0|     1|[0.0,0.0,40.0,28....|[0.0,0.0,1.340464...|
|0.0|0.0|46.0|   38.0|    27.0|      5.0|  8.0|    0.0|    1.0|     1|[0.0,0.0,46.0,38....|[0.0,

In [28]:
# x_cols = [x for x in df2.columns if x != 'result']

# cols = [x for x in df2.columns if x == 'result' or x == 'scaled']
# new_df = df2[cols]
# new_df.show(5)

In [30]:
from pyspark.mllib.classification import LogisticRegressionWithSGD, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

In [43]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol='result',
                        featuresCol='scaled',
                        maxIter=100, 
                        regParam=0.8, 
                        elasticNetParam=0)

# Fit the model
lrModel = lr.fit(df2)


In [44]:
print(lrModel.coefficients)

(9,[],[])


In [33]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# compute predictions. this will append column "prediction" to dataframe
lrPred = lrModel.transform(df2)
lrPred.select('probability','prediction').show(50,truncate=False)

# set up evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",
                                          labelCol="result",
                                          metricName="areaUnderPR")

# pass to evaluator the DF with predictions, labels
aupr = evaluator.evaluate(lrPred)

print("Area under PR Curve:", aupr)

+-----------+----------+
|probability|prediction|
+-----------+----------+
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |
|[0.5,0.5]  |0.0       |


In [34]:
# lrModel.predict(test_data)

In [35]:
df2.show(10)

+----+---+-----+-------+--------+---------+-----+-------+-------+------+--------------------+--------------------+
| ace| df| svpt|firstIn|firstWon|secondWon|SvGms|bpSaved|bpFaced|result|            features|              scaled|
+----+---+-----+-------+--------+---------+-----+-------+-------+------+--------------------+--------------------+
|10.0|0.0| 92.0|   57.0|    41.0|     17.0| 15.0|    4.0|    7.0|     1|[10.0,0.0,92.0,57...|[1.92735559605842...|
|23.0|1.0|104.0|   55.0|    47.0|     31.0| 17.0|    3.0|    3.0|     1|[23.0,1.0,104.0,5...|[4.43291787093437...|
| 8.0|3.0| 71.0|   31.0|    26.0|     26.0| 11.0|    5.0|    6.0|     1|[8.0,3.0,71.0,31....|[1.54188447684674...|
| 1.0|2.0| 75.0|   49.0|    31.0|     16.0| 13.0|    3.0|    6.0|     1|[1.0,2.0,75.0,49....|[0.19273555960584...|
|10.0|1.0| 56.0|   28.0|    23.0|     18.0|  8.0|    2.0|    2.0|     1|[10.0,1.0,56.0,28...|[1.92735559605842...|
| 4.0|2.0| 66.0|   40.0|    32.0|     14.0| 11.0|    6.0|    7.0|     1|[4.0,2.0