In [None]:
# Thank you for your application for the Data Engineer role at src | ftbl.  We are overwhelmed with interest and are happy to share that you've been selected to participate in the next round. The hiring process will consist of 3 parts: 

# 1.     a take home project, 

# 2.     a technical interview and 

# 3.     a general interview.

# After we have interviewed all the candidates, we hope to make a decision as quickly as possible. We will try to keep candidates informed of the timelines as much as possible.


# For the take home project, we would like you to use the attached SkillCorner data to complete the following tasks:

# Task 1

# Overview:

# Write an ETL process to store the data in formats to be used by the data science team.  It is up to you to determine what the format and schema should be based on what you anticipate the questions of the data science team will be.

# Deliverables:

# · Source code for the ETL process

# · Sample output (please do not send back the full data set)

# · A write up explaining your thought process as well as answers to the following questions:

# Why did you choose this schema?
# What alternatives did you consider?
# o   What are the trade offs in terms of running time, compute cost and storage cost?

# o   Are there any concerns with scale?

# o   What might change if you were asked to merge this with event data (for example Statsbomb or WyScout)

 
# Task 2

# Overview:

# Using the transformed data from the first task, write a script in PySpark or an equivalent language, that determines for each player the most “intense” 5 minute segment of the match.  Additionally, calculate a metric called “spread” which is the distance between all players on the field at any moment in time and find the 2 minute segment with the highest spread. How do these two metrics relate to each other?

 

# Deliverables:

# · Source code for finding the most intense 5 minutes segment

# · Source code for calculating the spread

# · A small write up of how you defined and found the most intense 5 minute segment, the 2 minute segment with the highest spread, and a brief description of how these two metrics are related to each other

 

# What are we looking for?

# We are not looking for perfect or “right” answers - these are difficult questions and there are many unique ways to answer them.  We want to see how you interpret and approach the problems both from a technical and football perspective as well as how you communicate your thought process.

 

# Submissions

# Please submit your project as a single zip file to info@srcftbl.com along with the following information:

# 1.     Are you eligible to work in your current country of residence? If not, please explain your situation and desired resolution

# 2.     General availability for follow up interviews over the next three weeks

In [3]:
# Write an ETL process to store the data in formats to be used by the data science team.  It is up to you to determine what the format and schema should be based on what you anticipate the questions of the data science team will be.

# Deliverables:

# · Source code for the ETL process

# · Sample output (please do not send back the full data set)

# · A write up explaining your thought process as well as answers to the following questions:

# Why did you choose this schema?
# What alternatives did you consider?
# o   What are the trade offs in terms of running time, compute cost and storage cost?

# o   Are there any concerns with scale?

# o   What might change if you were asked to merge this with event data (for example Statsbomb or WyScout)

# Extract - Unzip Folder 
import zipfile
import os

def unzip_folder(zip_file_path, extract_to):
    if not zipfile.is_zipfile(zip_file_path):
        print("Invalid zip file.")
        return
    if not os.path.exists(extract_to):
        os.makedirs(extract_to)
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        # Extract all contents to the specified directory
        zip_ref.extractall(extract_to)
        print("Folder successfully unzipped to:", extract_to)

if __name__ == "__main__":
    # Example usage:
    zip_file_path = "C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/src_de_sample_data.zip" 
    extract_to = "C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/src_de_sample_data"

    unzip_folder(zip_file_path, extract_to)


Folder successfully unzipped to: C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/src_de_sample_data


In [80]:
import pandas as pd
import json
import os

# List of JSON file paths
json_files = ['C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/src_de_sample_data/10000_metadata.json', 'C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/src_de_sample_data/10017_metadata.json', 
              'C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/src_de_sample_data/10013_metadata.json', 
              'C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/src_de_sample_data/100094_metadata.json',
              'C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/src_de_sample_data/10009_metadata.json']


# Initialize an empty DataFrame to store all data
all_data = pd.DataFrame()

# Process each JSON file
for json_file in json_files:
    # Read JSON data from the file
    with open(json_file, 'r') as file:
        json_data = json.load(file)

    # Flatten the nested structure of the main JSON
    flat_data = pd.json_normalize(json_data)

    # Unroll the 'players' column
    players_data = pd.json_normalize(json_data['players'])
    flat_data = pd.concat([flat_data, players_data], axis=1).drop('players', axis=1)

    # Add a new column for the 'id' from the JSON file
    flat_data['game_id'] = json_data['id']

    # Append the data to the all_data DataFrame
    all_data = all_data.append(flat_data, ignore_index=True)

# Save the combined data to a CSV file
all_csv_path = 'combined_metadata.csv'
all_data.to_csv(all_csv_path, index=False)

print(f'All CSV files have been combined into: {all_csv_path}')



All CSV files have been combined into: combined_metadata.csv


  all_data = all_data.append(flat_data, ignore_index=True)
  all_data = all_data.append(flat_data, ignore_index=True)
  all_data = all_data.append(flat_data, ignore_index=True)
  all_data = all_data.append(flat_data, ignore_index=True)
  all_data = all_data.append(flat_data, ignore_index=True)


In [89]:
import csv
import json

# Open the input file
input_file_path = '10000_tracking.txt'
with open(input_file_path, 'r') as input_file:
    # Read lines from the file
    lines = input_file.readlines()

# Output CSV file path
output_csv_path = 'output.csv'

# Open the CSV file for writing
with open(output_csv_path, 'w', newline='') as output_csv:
    # Create a CSV writer object
    csv_writer = csv.writer(output_csv)

    # Write header to CSV
    csv_writer.writerow(["track_id", "trackable_object", "is_visible", "x", "y", "z", "frame", "timestamp"])

    # Iterate through each line in the input file
    for line in lines:
        # Load JSON data from the line
        data = json.loads(line)

        # Extract relevant information
        frame = data.get("frame")
        timestamp = data.get("timestamp")

        for item in data.get("data", []):
            track_id = item.get("track_id")
            trackable_object = item.get("trackable_object")
            is_visible = item.get("is_visible")
            x = item.get("x", 0.0)
            y = item.get("y", 0.0)
            z = item.get("z", 0.0)

            # Write the extracted data to CSV
            csv_writer.writerow([track_id, trackable_object, is_visible, x, y, z, frame, timestamp])

print(f'CSV file created successfully at: {output_csv_path}')


CSV file created successfully at: output_metadata.csv


In [150]:
import pandas as pd
from io import StringIO

# Example CSV data

# Read CSV data into a DataFrame
df = pd.read_csv('C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/combined_metadata.csv')

df.head()
# Define schemas based on related columns
DimStadium = df[['stadium.id', 'stadium.name', 'stadium.city', 'stadium.capacity','pitch_length','pitch_width']]
DimTeam = df[['home_team.id', 'home_team.name', 'home_team.short_name', 'home_team.acronym',
                   'away_team.id', 'away_team.name', 'away_team.short_name', 'away_team.acronym']]
DimKit = df[['home_team_kit.id', 'home_team_kit.team_id', 'home_team_kit.season.id',
                  'home_team_kit.season.start_year', 'home_team_kit.season.end_year',
                  'home_team_kit.season.name', 'home_team_kit.name', 'home_team_kit.jersey_color',
                  'home_team_kit.number_color', 'away_team_kit.id', 'away_team_kit.team_id',
                  'away_team_kit.season.id', 'away_team_kit.season.start_year',
                  'away_team_kit.season.end_year', 'away_team_kit.season.name',
                  'away_team_kit.name', 'away_team_kit.jersey_color', 'away_team_kit.number_color']]
DimSeason = df[['competition_edition.season.id','competition_edition.season.start_year','competition_edition.season.end_year']]
DimCoach = df[['home_team_coach.id', 'home_team_coach.first_name', 'home_team_coach.last_name',
                    'away_team_coach.id', 'away_team_coach.first_name', 'away_team_coach.last_name']]
DimCompetition = df[['competition_edition.id', 'competition_edition.competition.id',
                          'competition_edition.competition.area', 'competition_edition.competition.name',
                          'competition_edition.season.id', 'competition_edition.season.start_year',
                          'competition_edition.season.end_year', 'competition_edition.season.name',
                          'competition_edition.name', 'competition_round.id', 'competition_round.name',
                          'competition_round.round_number', 'competition_round.potential_overtime']]
DimCompetitionRound = df[['competition_round.id','competition_edition.name','competition_round.round_number','competition_round.potential_overtime']]
DimCompetionEdition = df[['competition_edition.id','competition_edition.competition.id','competition_edition.competition.area','competition_edition.competition.name','competition_edition.season.id']]
DimPlayer = df[['id.1', 'first_name', 'last_name', 'short_name', 'birthday', 'trackable_object',
                     'gender', 'player_role.id', 'player_role.name', 'player_role.acronym']]
DimPlayerRole = df[['player_role.id','player_role.name','player_role.acronym']]
FactMatch = df[['id', 'home_team_score','away_team_score','date_time','referees','status','home_team_side','stadium.id','home_team.id','away_team.id','competition_edition.season.id','home_team_coach.id','away_team_coach.id','competition_edition.id',
                'competition_edition.competition.id','competition_round.id','start_time','end_time','away_team_kit.id','ball.trackable_object',
                'yellow_card','red_card','goal','own_goal','injured']]

# # Save each schema to a separate CSV file
DimStadium.to_csv('DimStadium.csv', index=False)
DimTeam.to_csv('DimTeam.csv', index=False)
DimKit.to_csv('DimKit.csv', index=False)
DimCoach.to_csv('DimCoach.csv', index=False)
DimCompetition.to_csv('DimCompetition.csv', index=False)
DimPlayer.to_csv('DimPlayer.csv', index=False)
DimCompetionEdition.to_csv('DimCompetitionEdition.csv',index=False)
DimCompetitionRound.to_csv('DimCompetitionRound.csv',index=False)
DimSeason.to_csv('DimSeason.csv',index=False)
DimPlayerRole.to_csv('DimPlayerRole.csv',index=False)
FactMatch.to_csv('FactMatch.csv',index=False)

In [151]:
import csv
import json
import os
import re

# List of input files
input_files = ['C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/src_de_sample_data/10000_tracking.txt', 
               'C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/src_de_sample_data/10009_tracking.txt', 
               'C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/src_de_sample_data/10013_tracking.txt', 
               'C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/src_de_sample_data/100094_tracking.txt', 
               'C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/src_de_sample_data/10017_tracking.txt']

# Output CSV file path
output_csv_path = 'DimTracking.csv'

# Open the CSV file for writing
with open(output_csv_path, 'w', newline='') as output_csv:
    # Create a CSV writer object
    csv_writer = csv.writer(output_csv)

    # Write header to CSV
    csv_writer.writerow(["game_id", "track_id", "trackable_object", "is_visible", "x", "y", "z", "frame", "timestamp"])

    # Iterate through each input file
    for input_file_path in input_files:
        # Extract game_id using regular expression
        match = re.search(r'(\d+)_tracking\.txt', os.path.basename(input_file_path))
        game_id = match.group(1) if match else None

        # Open the input file
        with open(input_file_path, 'r') as input_file:
            # Read lines from the file
            lines = input_file.readlines()

        # Iterate through each line in the input file
        for line in lines:
            # Load JSON data from the line
            data = json.loads(line)

            # Extract relevant information
            frame = data.get("frame")
            timestamp = data.get("timestamp")

            for item in data.get("data", []):
                track_id = item.get("track_id")
                trackable_object = item.get("trackable_object")
                is_visible = item.get("is_visible")
                x = item.get("x", 0.0)
                y = item.get("y", 0.0)
                z = item.get("z", 0.0)

                # Write the extracted data to CSV
                csv_writer.writerow([game_id, track_id, trackable_object, is_visible, x, y, z, frame, timestamp])

print(f'CSV file created successfully at: {output_csv_path}')


CSV file created successfully at: DimTracking.csv


In [107]:
import pandas as pd

# Assuming your CSV file is named "your_file.csv"
file_path = "C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/output_tracking.csv"

# Read the CSV file into a pandas DataFrame
df = pd.read_csv(file_path)

# Filter the DataFrame based on the condition
filtered_df = df[df['trackable_object'] == 18731]

# Display the filtered DataFrame
filtered_df.to_csv('mcgoldrick_tracking.csv')


In [186]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, sqrt, sum,when
from pyspark.sql.window import Window

# Create a Spark session
spark = SparkSession.builder.appName("intense_spread_analysis").getOrCreate()

df = spark.read.csv("C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/mcgoldrick_tracking.csv", header=True, inferSchema=True)

# Define a window specification based on timestamp
time_window_spec = Window().orderBy("timestamp")

# Calculate distance between players using Euclidean distance
def calculate_distance(x1, y1, z1, x2, y2, z2):
    return sqrt((x1 - x2)**2 + (y1 - y2)**2 + (z1 - z2)**2)

# Calculate distance between all players for each frame
df = df.withColumn("distance", 
                  sum(when(
                      lag("x").over(time_window_spec).isNotNull(),
                      calculate_distance(
                          col("x"), col("y"), col("z"),
                          lag("x").over(time_window_spec),
                          lag("y").over(time_window_spec),
                          lag("z").over(time_window_spec)
                      )
                  ).otherwise(0)).over(time_window_spec))


# Define a window specification based on frame
frame_window_spec = Window().orderBy("frame").rangeBetween(-5 * 60, 0)

# Calculate intensity as the sum of distances for each player over a 5-minute window
df_intensity = df.withColumn("intensity", sum("distance").over(frame_window_spec))

# Find the most intense 5-minute segment for each player
df_most_intense = df_intensity.groupBy(
    "trackable_object", "game_id", "track_id", "is_visible", "intensity"
).agg(
    col("intensity")
).orderBy(col("intensity").desc()).limit(1)

# Calculate spread for each frame
df_spread = df.withColumn("spread", sum("distance").over(time_window_spec))

# Define a window specification based on frame for the 2-minute segment
frame_window_spec_2min = Window().orderBy("frame").rangeBetween(-2 * 60, 0)

# Find the 2-minute segment with the highest spread
df_highest_spread = df_spread.withColumn("spread_2min", sum("distance").over(frame_window_spec_2min))

# Show the results
df_most_intense.show()
df_highest_spread.orderBy(col("spread_2min").desc()).limit(1).show()

# Stop the Spark session
spark.stop()


+----------------+-------+--------+----------+-----------------+-----------------+
|trackable_object|game_id|track_id|is_visible|        intensity|        intensity|
+----------------+-------+--------+----------+-----------------+-----------------+
|           18731|  10000|   18731|      true|673305.5596246592|673305.5596246592|
+----------------+-------+--------+----------+-----------------+-----------------+

+-------+-------+--------+----------------+----------+-----+-----+---+-----+---------+-----------------+--------------------+------------------+
|    _c0|game_id|track_id|trackable_object|is_visible|    x|    y|  z|frame|timestamp|         distance|              spread|       spread_2min|
+-------+-------+--------+----------------+----------+-----+-----+---+-----+---------+-----------------+--------------------+------------------+
|1051026|  10000|   18731|           18731|      true|10.85|-20.2|0.0|57747| 01:33:07|2252.745064556146|1.0230466380205823E7|271897.55060692166|
+---

In [214]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, sum
from pyspark.sql.types import FloatType, TimestampType

# Step 1: Create SparkSession
spark = SparkSession.builder.appName("intense_window").getOrCreate()

# Step 2: Load DataFrame
# Assuming your DataFrame is named 'df'
# If not, replace 'df' with the actual name of your DataFrame
df = spark.read.csv("C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/mcgoldrick_tracking.csv", header=True, inferSchema=True)
# Step 3: Convert timestamp to seconds and cast to TimestampType
df = df.withColumn("timestamp_seconds", col("timestamp").cast(FloatType()))
df = df.withColumn("timestamp", col("timestamp").cast(TimestampType()))

# Step 4: Aggregate into 5-minute windows, calculating intensity as the sum of distance
intensity_df = df.groupBy(window("timestamp", "5 minutes")).agg(
    sum("x").alias("total_x"),
    sum("y").alias("total_y"),
    sum("z").alias("total_z")
)

# Calculate overall intensity (you may need to customize this based on your specific metrics)
intensity_df = intensity_df.withColumn("intensity", col("total_x") + col("total_y") + col("total_z"))

# Step 5: Find the 5-minute window with maximum intensity
max_intensity_window = intensity_df.orderBy(col("intensity").desc()).first()

# Display the result
print("Most intense 5-minute window:")
print("Window:", max_intensity_window["window"])
print("Intensity:", max_intensity_window["intensity"])

# Stop SparkSession
spark.stop()


Most intense 5-minute window:
Window: Row(start=datetime.datetime(2023, 12, 8, 1, 25), end=datetime.datetime(2023, 12, 8, 1, 30))
Intensity: 3149.350000000028


In [215]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, sum
from pyspark.sql.types import FloatType, TimestampType
from datetime import datetime, timedelta

# Step 1: Create SparkSession
spark = SparkSession.builder.appName("intense_window").getOrCreate()

# Step 2: Load DataFrame
# Assuming your DataFrame is named 'df'
# If not, replace 'df' with the actual name of your DataFrame
df = spark.read.csv("C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/mcgoldrick_tracking.csv", header=True, inferSchema=True)

# Step 3: Convert timestamp to seconds and cast to TimestampType
df = df.withColumn("timestamp_seconds", col("timestamp").cast(FloatType()))
df = df.withColumn("timestamp", col("timestamp").cast(TimestampType()))

# Step 4: Aggregate into 5-minute windows, calculating intensity as the sum of distance
intensity_df = df.groupBy(window("timestamp", "5 minutes")).agg(
    sum("x").alias("total_x"),
    sum("y").alias("total_y"),
    sum("z").alias("total_z")
)

# Calculate overall intensity (you may need to customize this based on your specific metrics)
intensity_df = intensity_df.withColumn("intensity", col("total_x") + col("total_y") + col("total_z"))

# Step 5: Find the 5-minute window with maximum intensity
max_intensity_window = intensity_df.orderBy(col("intensity").desc()).first()

# Convert the output to soccer game minute terms (assuming game duration is 92 minutes)
game_duration = 92
start_minute = int((max_intensity_window["window"]["start"].minute + max_intensity_window["window"]["start"].hour * 60) / 5)
end_minute = int((max_intensity_window["window"]["end"].minute + max_intensity_window["window"]["end"].hour * 60) / 5)

# Display the result
print("Most intense 5-minute window:")
print("Window: From {} to {} minutes".format(start_minute, end_minute))
print("Intensity:", max_intensity_window["intensity"])

# Stop SparkSession
spark.stop()


Most intense 5-minute window:
Window: From 17 to 18 minutes
Intensity: 3149.350000000028


In [217]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, sum
from pyspark.sql.types import FloatType, TimestampType
from datetime import datetime, timedelta

# Step 1: Create SparkSession
spark = SparkSession.builder.appName("intense_window").getOrCreate()

# Step 2: Load DataFrame
# Assuming your DataFrame is named 'df'
# If not, replace 'df' with the actual name of your DataFrame
df = spark.read.csv("C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/mcgoldrick_tracking.csv", header=True, inferSchema=True)

# Step 3: Convert timestamp to seconds and cast to TimestampType
df = df.withColumn("timestamp_seconds", col("timestamp").cast(FloatType()))
df = df.withColumn("timestamp", col("timestamp").cast(TimestampType()))

# Step 4: Aggregate into 5-minute windows, calculating intensity as the sum of distance
intensity_df = df.groupBy(window("timestamp", "5 minutes")).agg(
    sum("x").alias("total_x"),
    sum("y").alias("total_y"),
    sum("z").alias("total_z")
)

# Calculate overall intensity (you may need to customize this based on your specific metrics)
intensity_df = intensity_df.withColumn("intensity", col("total_x") + col("total_y") + col("total_z"))

# Step 5: Find the 5-minute window with maximum intensity
max_intensity_window = intensity_df.orderBy(col("intensity").desc()).first()

# Convert the output to soccer game minute terms (assuming game duration is 92 minutes)
game_duration = 92
start_minute = int((max_intensity_window["window"]["start"].minute + max_intensity_window["window"]["start"].hour * 60) / 5) * 5
end_minute = start_minute + 5

# Display the result
print("Most intense 5-minute window:")
print("Window: From {} to {} minutes".format(start_minute, end_minute))
print("Intensity:", max_intensity_window["intensity"])

# Stop SparkSession
spark.stop()


Most intense 5-minute window:
Window: From 85 to 90 minutes
Intensity: 3149.350000000028


In [220]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, sum, max, first
from pyspark.sql.types import FloatType, TimestampType

# Step 1: Create SparkSession
spark = SparkSession.builder.appName("intense_window").getOrCreate()

# Step 2: Load DataFrame from CSV
# Replace 'your_file_path.csv' with the actual path to your CSV file
df = spark.read.csv("C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/output_tracking.csv", header=True)

# Step 3: Convert timestamp to seconds and cast to TimestampType
df = df.withColumn("timestamp_seconds", col("timestamp").cast(FloatType()))
df = df.withColumn("timestamp", col("timestamp").cast(TimestampType()))

# Step 4: Aggregate into 5-minute windows, calculating intensity as the sum of distance
intensity_df = df.groupBy("game_id", window("timestamp", "5 minutes")).agg(
    sum("x").alias("total_x"),
    sum("y").alias("total_y"),
    sum("z").alias("total_z"),
    window("timestamp", "5 minutes").start.alias("window_start"),
    window("timestamp", "5 minutes").end.alias("window_end")
)

# Calculate overall intensity (you may need to customize this based on your specific metrics)
intensity_df = intensity_df.withColumn("intensity", col("total_x") + col("total_y") + col("total_z"))

# Step 5: Find the 5-minute window with maximum intensity for each game
max_intensity_per_game = intensity_df.groupBy("game_id").agg(
    max("intensity").alias("max_intensity"),
    first("window_start").alias("max_intensity_window_start"),
    first("window_end").alias("max_intensity_window_end")
)

# Display the result
print("Most intense 5-minute window for each game:")
max_intensity_per_game.show(truncate=False)

# Stop SparkSession
spark.stop()


Most intense 5-minute window for each game:
+-------+------------------+--------------------------+------------------------+
|game_id|max_intensity     |max_intensity_window_start|max_intensity_window_end|
+-------+------------------+--------------------------+------------------------+
|10009  |1007035.169999998 |2023-12-08 00:05:00       |2023-12-08 00:10:00     |
|10000  |734362.7400000067 |2023-12-08 00:25:00       |2023-12-08 00:30:00     |
|10013  |691442.9399999985 |2023-12-08 00:20:00       |2023-12-08 00:25:00     |
|10017  |923188.0400000022 |2023-12-08 00:20:00       |2023-12-08 00:25:00     |
|100094 |1262382.6200000085|2023-12-08 00:05:00       |2023-12-08 00:10:00     |
+-------+------------------+--------------------------+------------------------+



In [221]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, sum, max, first
from pyspark.sql.types import FloatType, TimestampType

# Step 1: Create SparkSession
spark = SparkSession.builder.appName("intense_window").getOrCreate()

# Step 2: Load DataFrame from CSV
# Replace 'your_file_path.csv' with the actual path to your CSV file
df = spark.read.csv("C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/output_tracking.csv", header=True)

# Step 3: Convert timestamp to seconds and cast to TimestampType
df = df.withColumn("timestamp_seconds", col("timestamp").cast(FloatType()))
df = df.withColumn("timestamp", col("timestamp").cast(TimestampType()))

# Step 4: Load player DataFrame from CSV
player_df = spark.read.csv("C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/DimPlayer.csv", header=True)

# Step 5: Join the tracking data with player information
joined_df = df.join(player_df, df["trackable_object"] == player_df["trackable_object"])

# Step 6: Aggregate into 5-minute windows, calculating intensity as the sum of distance
intensity_df = joined_df.groupBy("game_id", "short_name", window("timestamp", "5 minutes")).agg(
    sum("x").alias("total_x"),
    sum("y").alias("total_y"),
    sum("z").alias("total_z"),
    window("timestamp", "5 minutes").start.alias("window_start"),
    window("timestamp", "5 minutes").end.alias("window_end")
)

# Calculate overall intensity (you may need to customize this based on your specific metrics)
intensity_df = intensity_df.withColumn("intensity", col("total_x") + col("total_y") + col("total_z"))

# Step 7: Find the 5-minute window with maximum intensity for each player, for each game
max_intensity_per_player = intensity_df.groupBy("game_id", "short_name").agg(
    max("intensity").alias("max_intensity"),
    first("window_start").alias("max_intensity_window_start"),
    first("window_end").alias("max_intensity_window_end")
)

# Display the result
print("Most intense 5-minute window for each player, for each game:")
max_intensity_per_player.show(truncate=False)

# Stop SparkSession
spark.stop()


Most intense 5-minute window for each player, for each game:
+-------+----------------+------------------+--------------------------+------------------------+
|game_id|short_name      |max_intensity     |max_intensity_window_start|max_intensity_window_end|
+-------+----------------+------------------+--------------------------+------------------------+
|10000  |G. Wijnaldum    |37136.00999999992 |2023-12-08 00:40:00       |2023-12-08 00:45:00     |
|10013  |A. Saint-Maximin|19391.159999999967|2023-12-08 00:35:00       |2023-12-08 00:40:00     |
|10017  |P. Aubameyang   |79886.51000000002 |2023-12-08 00:05:00       |2023-12-08 00:10:00     |
|10017  |J. Maddison     |29165.170000000027|2023-12-08 00:40:00       |2023-12-08 00:45:00     |
|100094 |G. Hanley       |98093.33999999982 |2023-12-08 00:05:00       |2023-12-08 00:10:00     |
|100094 |A. Broja        |87644.4399999998  |2023-12-08 00:10:00       |2023-12-08 00:15:00     |
|10000  |G. Baldock      |72300.73000000001 |2023-12-08 0

In [280]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, sum, max, first, date_format, hour, minute
from pyspark.sql.types import FloatType, TimestampType
from datetime import datetime, timedelta

# Step 1: Create SparkSession
spark = SparkSession.builder.appName("intense_window1").getOrCreate()

# Step 2: Load DataFrame from CSV
# Replace 'your_file_path.csv' with the actual path to your CSV file
df = spark.read.csv("C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/output_tracking.csv", header=True)

# Step 3: Convert timestamp to seconds and cast to TimestampType
df = df.withColumn("timestamp_seconds", col("timestamp").cast(FloatType()))
df = df.withColumn("timestamp", col("timestamp").cast(TimestampType()))

# Step 4: Load player DataFrame from CSV
player_df = spark.read.csv("C:/Users/RaymondCarpenter/Documents/GitHub/14thstreetanalytics/src_ftbl/DimPlayer.csv", header=True)

# Step 5: Join the tracking data with player information
joined_df = df.join(player_df, df["trackable_object"] == player_df["trackable_object"])

# Step 6: Aggregate into 5-minute windows, calculating intensity as the sum of distance
intensity_df = joined_df.groupBy("game_id", "short_name", window("timestamp", "5 minutes")).agg(
    sum("x").alias("total_x"),
    sum("y").alias("total_y"),
    sum("z").alias("total_z"),
    window("timestamp", "5 minutes").start.alias("window_start"),
    window("timestamp", "5 minutes").end.alias("window_end")
)

# Calculate overall intensity (you may need to customize this based on your specific metrics)
intensity_df = intensity_df.withColumn("intensity", col("total_x") + col("total_y") + col("total_z"))

# Step 7: Find the 5-minute window with maximum intensity for each player, for each game
max_intensity_per_player = intensity_df.groupBy("game_id", "short_name").agg(
    max("intensity").alias("max_intensity"),
    first(date_format("window_start", "HH:mm:ss")).alias("max_intensity_window_start"),
    first(date_format("window_end", "HH:mm:ss")).alias("max_intensity_window_end")
)

# Calculate the game duration (adjust accordingly)
game_duration = 92

# Convert the output to soccer game minute terms
max_intensity_per_player = max_intensity_per_player.withColumn(
    "max_intensity_window_start_minute",
    (hour("max_intensity_window_start") * 60 + minute("max_intensity_window_start")) % game_duration
)

max_intensity_per_player = max_intensity_per_player.withColumn(
    "max_intensity_window_end_minute",
    (hour("max_intensity_window_end") * 60 + minute("max_intensity_window_end")) % game_duration
)

# Display the result
print("Most intense 5-minute window for each player, for each game:")
max_intensity_per_player.show(truncate=False)

# Stop SparkSession
spark.stop()


ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it