## <font color='black'> Spark Connection and Intial Setup </font>

In [None]:
import pyspark
from pyspark.sql import SparkSession

user = username
passwd = passwd
s3_bucket = bucket
s3_server = s3_server_url
s3_access_key = accesskey
s3_secret_key = secretkey
mongo_uri = f"mongodb://{user}:{passwd}@mongo:27017/admin?authSource=admin"
server_name = "jdbc:sqlserver://mssql"
database_name = dbname
mssql_user = sql_user
mssql_pw = sql_passwd
mssql_url = (
    server_name
    + ";"
    + "databaseName="
    + database_name
    + ";encrypt=true;trustServerCertificate=true;"
)

jars = [
    "org.apache.hadoop:hadoop-aws:3.1.2",
    "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1",
    "com.microsoft.azure:spark-mssql-connector_2.12:1.2.0",
    "com.microsoft.sqlserver:mssql-jdbc:12.2.0.jre11",
]

spark = (
    SparkSession.builder.master("local")
    .appName("jupyter-pyspark")
    .config("spark.jars.packages", ",".join(jars))
    .config("spark.hadoop.fs.s3a.endpoint", s3_server_url)
    .config("spark.hadoop.fs.s3a.access.key", accesskey)
    .config("spark.hadoop.fs.s3a.secret.key", secretkey)
    .config("spark.hadoop.fs.s3a.fast.upload", True)
    .config("spark.hadoop.fs.s3a.path.style.access", True)
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.mongodb.input.uri", mongo_uri)
    .config("spark.mongodb.output.uri", mongo_uri)
    .getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("ERROR")

## <font color='black'>Importing necessary pyspark libraries</font>

In [None]:
from pyspark.sql.functions import split, col, expr, sum, row_number, struct, lit, when, collect_list
from pyspark.sql.column import Column
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, LongType,ArrayType

## <font color='black'> Reading players data from sql server. </font>

In [None]:
players = spark.read.format("com.microsoft.sqlserver.jdbc.spark") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", mssql_url) \
    .option("dbtable", tablename) \
    .option("user", mssql_user) \
    .option("password", mssql_pw) \
    .load()

## <font color='black'> Reading teams data from sql server. </font>

In [None]:
teams = spark.read.format("com.microsoft.sqlserver.jdbc.spark") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", mssql_url) \
    .option("dbtable", tablename2) \
    .option("user", mssql_user) \
    .option("password", mssql_pw) \
    .load()

## <font color='black'> Q1) Write a drill SQL query to list the team and player data. Specifically display team name, team wins, team losses player name, player shots and player goals.
</font>

In [None]:
#Listing the data in the players table and teams table
select t.name as Team_Name, 
        t.wins as Team_wins, 
        t.losses as Team_Losses, 
        p.name as Player_Name, p.shots, p.goals 
from mssql.dbo.players p JOIN mssql.dbo.teams t ON p.teamid = t.id


![Alt text](image-28.png)
![Alt text](image-29.png)

## <font color='black'> Q2) Write a drill SQL query to display the gamestream. Label each of the columns in the gamestream with their appropriate columns names from the data dictionary. </font>

In [None]:
SELECT columns[0]as event_id,
       columns[1] as timestamp_of_goal,
       columns[2] as team_id,
       columns[3] as player_jersey_number,
       columns[4] as goal_nogoal 
FROM minio.`gamestream.txt`


![Alt text](image-30.png)
![Alt text](image-31.png)

## <font color='black'> Q3) Write pyspark code (in SQL or DataFrame API) to display the gamestream. Label each of the columns in the gamestream with their appropriate columns names from the data dictionary.</font>

In [None]:
# TODO: Read the gamestream.txt from minio
url = f"s3a://gamestreams/gamestream.txt"
read_text = spark.read.text(url)
read_text.show(5)



![Alt text](image-32.png)

In [None]:
#Splitting the columns as per the data dictionary
from pyspark.sql.functions import split, col
games_data = read_text.withColumn("event_id", split(read_text["value"], " ").getItem(0))
games_data = games_data.withColumn("timestamp_of_goal", split(read_text["value"], " ").getItem(1))
games_data = games_data.withColumn("team_id", split(read_text["value"], " ").getItem(2))
games_data = games_data.withColumn("player_jersey_number", split(read_text["value"], " ").getItem(3))
games_data = games_data.withColumn("goal_nogoal", split(read_text["value"], " ").getItem(4))
games_data = games_data.drop(col("value"))

games_data.show(5)

![Alt text](image-1.png)

## <font color='black'> Q4) Write pyspark code (in SQL or DataFrame API) to group the gamestream by team/player id adding up the shots and goals.</font>
## <font color='black'>- Include the team score.</font>
## <font color ='black'>- Include the latest event id and the timestamp for that event id.</font>

In [None]:
#Changing to the appropriate data types
games_data =  games_data.withColumn("goal_nogoal",col("goal_nogoal").cast("integer"))
games_data =  games_data.withColumn("team_id",col("team_id").cast("integer"))
games_data =  games_data.withColumn("event_id",col("event_id").cast("integer"))
games_data =  games_data.withColumn("player_jersey_number",col("player_jersey_number").cast("integer"))
games_data.show(2)



![Alt text](image-2.png)

In [None]:
games_data.createOrReplaceTempView("games_data_1")

stream_data = spark.sql(''' 

select (event_id) _id,(timestamp_of_goal) as timestamp,  player_jersey_number, team_id,  
SUM(goal_nogoal) OVER (PARTITION BY team_id) as team_score,
COUNT(goal_nogoal) OVER(PARTITION BY team_id, player_jersey_number) as player_shots,
SUM(goal_nogoal) OVER(PARTITION BY team_id, player_jersey_number) as player_goals,
max(event_id) OVER() as event_id,
MIN(timestamp_of_goal) OVER() as max_timestamp
from games_data_1
ORDER BY team_id
''')

stream_data.show(5)

![Alt text](image-3.png)

## <font color='black'> Q5) Write pyspark code (in SQL or DataFrame API) to join the output from question 4 with the player and team reference data mssql so that you have the data necessary for the box score  </font>

In [None]:

#Joining the players data, teams data and gamestream dataframe

player = spark.read.format("com.microsoft.sqlserver.jdbc.spark") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", mssql_url) \
    .option("dbtable", "players") \
    .option("user", mssql_user) \
    .option("password", mssql_pw) \
    .load()
player = player.withColumnRenamed("name","PlayerName")\
        .withColumnRenamed("id","PlayerID") \
        .withColumnRenamed("number","PlayerJNumber")
player.show()

teams = spark.read.format("com.microsoft.sqlserver.jdbc.spark") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", mssql_url) \
    .option("dbtable", "teams") \
    .option("user", mssql_user) \
    .option("password", mssql_pw) \
    .load()

teams = teams.withColumnRenamed("id","id_team")
teams.show()


![Alt text](image-4.png)

![Alt text](image-5.png)

In [None]:
#Creating a final dataframe needed for the schema
stream_player_team.createOrReplaceTempView("final")

final = spark.sql('''
SELECT _id, timestamp, team_id, name, conference, wins, losses, team_score, event_id, max_timestamp, PlayerID as player_id,
PlayerName as player_name, player_shots, player_goals 
FROM final
''')

final.show(5)

print((final.count(), len(final.columns)))


![Alt text](image-6.png)
![Alt text](image-7.png)

## <font color = 'black'> Q6) Write pyspark code (in SQL or DataFrame API) to transform the output from question 5 into the box score document structure shown in part 3.1. </font>

In [None]:
#Question 6 
b1 = boxscore
#Creating a custom schema 
custom_schema = b1.select("_id", "timestamp", when(col("team_id") == 101, struct("team_id","conference","wins","losses","team_score", \
                        array(struct("player_id","player_name","player_shots","player_goals","pct").alias("player")).alias("players")).alias("home")).alias("Home"), \
             when(col("team_id") == 205, struct("team_id","conference","wins","losses","team_score", \
                    array(struct("player_id","player_name","player_shots","player_goals","pct").alias("player")).alias("players")).alias("away")).alias("Away"))

custom_schema.printSchema()
custom_schema.show()



![Alt text](image-8.png)
![Alt text](image-9.png)
![Alt text](image-10.png)

## <font color='black'> Q7) Write pyspark code (in SQL or DataFrame API) to write the box score completed in question 6 to the mongo.sidearm.boxscores collection. </font>


In [None]:
#TODO: Write the gamestream to mongodb
#Question 7 
#Writing data to boxscore collections
custom_schema.write.format("mongo").mode("overwrite").option("database","sidearm").option("collection","boxscore").save()



![Alt text](image-11.png)

## <font color='black'> Q8) Combine parts 4-7 into a single pyspark script that will run the entire process of creating the box score document. Make sure to run this a couple of times while the game stream is going on. </font>

In [None]:
#Question 8
from pyspark.sql.functions import *
import pyspark.sql.functions as f
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, ArrayType, MapType
from pyspark.sql.functions import struct, when,col

#Reading the gamestream data
url = f"s3a://gamestreams/gamestream.txt"
read_text = spark.read.text(url)


#Creating the boxscore document
games_data.createOrReplaceTempView("games_data_1")

games_data.createOrReplaceTempView("games_data_1")

stream_data = spark.sql(''' 

select (event_id) _id,(timestamp_of_goal) as timestamp,  player_jersey_number, team_id,  
SUM(goal_nogoal) OVER (PARTITION BY team_id) as team_score,
COUNT(goal_nogoal) OVER(PARTITION BY team_id, player_jersey_number) as player_shots,
SUM(goal_nogoal) OVER(PARTITION BY team_id, player_jersey_number) as player_goals,
max(event_id) OVER() as event_id,
MIN(timestamp_of_goal) OVER() as max_timestamp
from games_data_1
ORDER BY _id
''')

stream_data.show(5)

stream_player = stream_data.join(player, (stream_data.team_id == player.teamid) & (stream_data.player_jersey_number == player.PlayerJNumber))
stream_player_team = stream_player.join(teams, stream_player.team_id == teams.id_team)


#Creating a final dataframe needed for the schema
stream_player_team.createOrReplaceTempView("final")

final = spark.sql('''
SELECT _id, timestamp, team_id, name, conference, wins, losses, team_score, event_id, max_timestamp, PlayerID as player_id,
PlayerName as player_name, player_shots, player_goals 
FROM final ORDER BY _id
''')

final.createOrReplaceTempView("boxscore")

#Creating the needed columns
boxscore = spark.sql('''SELECT *, CASE WHEN team_id = 101 THEN "Home" ELSE "Away" END as Home_Away from boxscore
''')
boxscore = boxscore.withColumn("pct", round(boxscore.player_goals/boxscore.player_shots ,2))

b1 = boxscore
#Creating a custom schema 
custom_schema = b1.select("_id", "timestamp", when(col("team_id") == 101, struct("team_id","conference","wins","losses","team_score", \
                        array(struct("player_id","player_name","player_shots","player_goals","pct").alias("player")).alias("players")).alias("home")).alias("Home"), \
             when(col("team_id") == 205, struct("team_id","conference","wins","losses","team_score", \
                    array(struct("player_id","player_name","player_shots","player_goals","pct").alias("player")).alias("players")).alias("away")).alias("Away"))

#Writing data to boxscore collections
custom_schema.write.format("mongo").mode("overwrite").option("database","sidearm").option("collection","boxscore").save()


![Alt text](image-15.png)
![Alt text](image-16.png)

##  <font color='black'> Q9) Write a drill SQL query to display all the box scores. </font>


In [None]:
select * from mongo.sidearm.boxscore

![Alt text](image-17.png)

## <font color='black'> Q10) Write a drill SQL query to display the latest box score. </font>


In [None]:
WITH CTE as(
select *, max(_id) OVER() as latest_box_score from mongo.sidearm.boxscore
)
select * from CTE 
where _id = latest_box_score


![Alt text](image-18.png)
![Alt text](image-19.png)

## <font color='black'> Q11) When the game is complete, write pyspark code (in SQL or DataFrame API) update the wins and losses for the teams in the teams table. Specifically, load the teams table and update it, then display the updated data frame.</font>


In [None]:
boxscore.createOrReplaceTempView("teams_2")
query3 = '''
WITH CTE as (
select DISTINCT team_id, team_score from teams_2
)
select *, rank() over(order by team_score desc) as rnk from cte
'''
rank_score = spark.sql(query3)
rank_score.show()



![Alt text](image-20.png)

In [None]:
#Joining the teams table with the ranks table
teams_rank = teams.join(rank_score,teams.id_team == rank_score.team_id)
teams_rank.show()


![Alt text](image-21.png)

In [None]:
#Updating the teams table
teams_rank.createOrReplaceTempView("updates_teams")
teams_2 = spark.sql('''select id_team, name, conference, CASE WHEN rnk =1 then wins+1 else wins END as wins,
         CASE WHEN rnk =2 then losses+1 else losses END as losses from updates_teams ''')

teams_2.show()


![Alt text](image-22.png)

## <font color='black'> Q12) Write pyspark code (in SQL or DataFrame API) to write the updated in question 11 to a new mssql.sidearmdb.teams2 table.</font>


In [None]:
#Writing to MSSQL
teams_2.write.format("com.microsoft.sqlserver.jdbc.spark") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .mode("overwrite") \
    .option("url", mssql_url) \
    .option("dbtable", "teams_2") \
    .option("user", mssql_user) \
    .option("password", mssql_pw) \
    .save()


![Alt text](image-23.png)

## <font color='black'> Q13) When the game is complete, write pyspark code (in SQL or DataFrame API) update the shots and goals for the players in the players table. Specifically, load the players table and update it, then display the updated data frame. </font>


In [None]:
#Updating the player shots and goals
boxscore.createOrReplaceTempView("player_3")
player.createOrReplaceTempView("player_2")

updating_player = spark.sql('''

select distinct p.PlayerID, p.PlayerName, p.PlayerJNumber, p.shots, p.goals, p.teamid, p1.player_shots, p1.player_goals
from player_2 p JOIN player_3 p1 on p.teamid = p1.team_id and p.PlayerJNumber = p1.player_j_number
order by PlayerID
'''
)
updating_player.show()
updating_player.createOrReplaceTempView("updating_player")

player_2 = spark.sql('''
select PlayerID, PlayerName, PlayerJNumber, (shots + player_shots) as u_shots, (goals+player_goals) as u_goals, teamid from updating_player 
''')
player_2.show()



![Alt text](image-24.png)
![Alt text](image-25.png)
![Alt text](image-26.png)

## <font color='black'> Q14) Write pyspark code (in SQL or DataFrame API) to write the updated in question 11 to a new mssql.sidearmdb.players table. </font>


In [None]:
#Question14
#Writing to MSSQL
player_2.write.format("com.microsoft.sqlserver.jdbc.spark") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .mode("overwrite") \
    .option("url", mssql_url) \
    .option("dbtable", "players2") \
    .option("user", mssql_user) \
    .option("password", mssql_pw) \
    .save()


![Alt text](image-27.png)

## <font color='black'> Re-write drill SQL query from question 1 to use the updated players2 and teams2 tables. </font>

In [None]:
select t.name as Team_Name, 
       t.wins as Team_wins, 
       t.losses as Team_Losses, 
       p.PlayerName as Player_Name, 
       p.u_shots as player_shots,
       p.u_goals as player_goals 
FROM mssql.dbo.players2 p JOIN mssql.dbo.teams_2 t ON p.teamid = t.id_team


![Alt text](image-2.png)
![Alt text](image-3.png)