In [5]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from io import BytesIO
import boto3
from pyspark.sql.functions import avg

BUCKET_NAME = 'msds-694-19'
FILE_NAME = 'data/openipf.csv'
LOCAL_FILE_NAME = 'openipf_local.csv'

# Create an S3 client
s3 = boto3.client('s3')

# Download the file from S3
s3.download_file(BUCKET_NAME, FILE_NAME, LOCAL_FILE_NAME)

# No additional code is needed if you just want to save the file locally
print(f"File downloaded successfully as {LOCAL_FILE_NAME}")

File downloaded successfully as openipf_local.csv


In [6]:
# Read openipf_local.csv as spark rdd
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

# Read CSV file as rdd
df = spark.read.csv(LOCAL_FILE_NAME, header=True, inferSchema=True)

In [10]:
# Print the first 5 rows
rdd.take(5)

                                                                                

['Name,Sex,Event,Equipment,Age,AgeClass,BirthYearClass,Division,BodyweightKg,WeightClassKg,Squat1Kg,Squat2Kg,Squat3Kg,Squat4Kg,Best3SquatKg,Bench1Kg,Bench2Kg,Bench3Kg,Bench4Kg,Best3BenchKg,Deadlift1Kg,Deadlift2Kg,Deadlift3Kg,Deadlift4Kg,Best3DeadliftKg,TotalKg,Place,Dots,Wilks,Glossbrenner,Goodlift,Tested,Country,State,Federation,ParentFederation,Date,MeetCountry,MeetState,MeetTown,MeetName',
 'Karen Thomson,F,SBD,Single-ply,39.5,35-39,40-49,Masters 1,51.9,52,105,110.5,-115,,110.5,67.5,70.5,72.5,,72.5,125,130,-136,,130,312.5,1,381.42,390.15,345.86,63.73,Yes,New Zealand,,OceaniaPF,IPF,2002-12-06,New Zealand,BOP,Tauranga,Oceania Championships',
 'Deborah Groves,F,SBD,Single-ply,30.5,24-34,24-39,Open,50.64,52,122.5,-127.5,127.5,,127.5,82.5,87.5,90,,90,140,152.5,-157,,152.5,370,1,459.45,470.74,417.74,76.76,Yes,Australia,,OceaniaPF,IPF,2002-12-06,New Zealand,BOP,Tauranga,Oceania Championships',
 'Renee Orbon,F,SBD,Single-ply,31.5,24-34,24-39,Open,55.76,56,105.5,110,115,,115,82.5,85,87.5,,87

In [13]:
# 1. Average lifts by weight class
avg_lifts_weight_class = df.groupBy("WeightClassKg").agg(
    avg("Best3SquatKg").alias("Average Squat"),
    avg("Best3BenchKg").alias("Average Bench"),
    avg("Best3DeadliftKg").alias("Average Deadlift")
).orderBy("WeightClassKg")

# 2. Total lifts by age class
total_lifts_age_class = df.groupBy("AgeClass").agg(
    (avg("Best3SquatKg") + avg("Best3BenchKg") + avg("Best3DeadliftKg")).alias("Total Average Lift")
).orderBy("AgeClass")

# 3. Performance metrics by country
performance_by_country = df.groupBy("Country").agg(
    avg("TotalKg").alias("Average Total Lift"),
    avg("Wilks").alias("Average Wilks Score")
).orderBy("Country")

# Show the results
avg_lifts_weight_class.show()
total_lifts_age_class.show()
performance_by_country.show()


                                                                                

+-------------+------------------+------------------+------------------+
|WeightClassKg|     Average Squat|     Average Bench|  Average Deadlift|
+-------------+------------------+------------------+------------------+
|         NULL|168.72141279728177| 113.4770378464288|187.34652566735096|
|            +|           161.375|108.08196721311475|182.41964285714286|
|           10|             137.5|              92.5|             190.0|
|          100|237.75957033719678| 161.8160418703158|247.45758259170208|
|         100+|227.24429657794678|156.45361363636363|234.83865384615385|
|        100.2|             270.0|             167.5|             295.0|
|        101.5| 268.3333333333333|             155.0| 286.6666666666667|
|        101.6|280.09000000000003|            182.57|            302.77|
|       101.6+|             255.0|             162.5|             287.5|
|          103|             255.0|             147.5|             275.0|
|        103.2|             250.0|             162.

                                                                                

+--------+------------------+
|AgeClass|Total Average Lift|
+--------+------------------+
|    NULL|502.74615296629884|
|   13-15| 295.0394791531555|
|   16-17|   387.22966981199|
|   18-19|459.03662325698565|
|   20-23|  517.224592663161|
|   24-34| 552.7325101000439|
|   35-39| 554.1151490480497|
|   40-44| 534.5710247961503|
|   45-49|508.66692801846466|
|    5-12|235.17544841050383|
|   50-54| 485.8203239287369|
|   55-59|460.69194579511145|
|   60-64| 432.7958703512679|
|   65-69|391.33968874650753|
|   70-74| 367.5005190948729|
|   75-79|  328.906660260012|
|  80-999|  284.597053402506|
+--------+------------------+





+--------------------+------------------+-------------------+
|             Country|Average Total Lift|Average Wilks Score|
+--------------------+------------------+-------------------+
|                NULL|  384.780407027278| 277.75537061297774|
|         Afghanistan|374.34615384615387|  245.8709230769231|
|             Algeria| 581.6334951456311|  404.4951941747573|
|      American Samoa|             427.5| 304.11999999999995|
|           Argentina| 371.9643362831859|  270.1904858299594|
|             Armenia|356.84939759036143| 239.42783132530113|
|               Aruba|        410.234375| 254.76812499999997|
|           Australia|423.76563935847423| 311.28167760484956|
|             Austria|422.87873521011835|  309.9792018010643|
|          Azerbaijan| 448.4782608695652| 288.18652173913046|
|             Bahamas| 549.9789534883721| 375.95329411764715|
|             Bahrain| 575.8235294117648|  405.9082352941177|
|          Bangladesh|            527.25|            399.764|
|       

                                                                                

23/11/16 16:42:30 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 304713 ms exceeds timeout 120000 ms
23/11/16 16:42:31 WARN SparkContext: Killing executors is not supported by current scheduler.
23/11/16 16:42:36 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o