In [1]:
!hdfs dfs -put data/football /data/football

In [2]:
!hdfs dfs -cat /data/football/* | head -n 10

Round,Date,Time,Home_Team,Home_Score,Away_Score,Away_Team,Home_Score_AET,Away_Score_AET,Home_Penalties,Away_Penalties,Home_Points,Away_Points,season,Country,Competition
ROUND 1,31/08/2002,22:30,RACING SANTANDER,0,1,VALLADOLID,,,,,0.0,3.0,2002,spain,primera-division
ROUND 1,01/09/2002,21:00,RAYO VALLECANO,2,2,ALAVES,,,,,1.0,1.0,2002,spain,primera-division
ROUND 1,01/09/2002,21:00,REAL SOCIEDAD,4,2,ATHLETIC BILBAO,,,,,3.0,0.0,2002,spain,primera-division
ROUND 1,01/09/2002,22:00,MALLORCA,0,2,VALENCIA,,,,,0.0,3.0,2002,spain,primera-division
ROUND 1,01/09/2002,22:30,VILLARREAL,2,2,OSASUNA,,,,,1.0,1.0,2002,spain,primera-division
ROUND 1,01/09/2002,22:30,RECREATIVO HUELVA,2,3,MALAGA,,,,,0.0,3.0,2002,spain,primera-division
ROUND 1,01/09/2002,23:00,BARCELONA,2,2,ATLETICO MADRID,,,,,1.0,1.0,2002,spain,primera-division
ROUND 1,01/09/2002,23:00,DEPORTIVO LA CORUNA,2,4,BETIS,,,,,0.0,3.0,2002,spain,primera-division
ROUND 1,01/09/2002,23:30,SEVILLA,0,1,CELTA VIGO,,,,,0.0,3.0,2002,spain,prime

In [47]:
%%writefile mapper.py

import sys

won = lambda i: 1 if i == 3 else 0

for line in sys.stdin:
    
    try:
        tokens = line.strip().split(',')
        team_home = tokens[3]
        team_away = tokens[6]
        won_home = won(int(float(tokens[-5])))
        won_away = won(int(float(tokens[-4])))
        
    except:
        continue
        
    print(f'{team_home.upper()}\t{won_home}')
    print(f'{team_away.upper()}\t{won_away}')

Overwriting mapper.py


In [46]:
%%writefile reducer.py

import sys

curr_team = None
curr_wins = 0
curr_count = 0

for line in sys.stdin:
    
    try:
        team, won = line.split('\t', 1)
        won = int(won)
        
    except:
        continue
        
    if team != curr_team:
        if curr_team:
            print(f'{curr_team}\t{curr_wins}\t{curr_count}')
        curr_team = team
        curr_wins = 0
        curr_count = 0
    curr_wins += won
    curr_count += 1

if curr_team:
    print(f'{curr_team}\t{curr_wins}\t{curr_count}')

Overwriting reducer.py


In [45]:
%%writefile mapper_stats.py

import sys

for line in sys.stdin:
    
    try:
        team, wins, total = line.split('\t', 2)
        wins = int(wins)
        total = int(total)
        
    except:
        continue
        
    perc = (1.0 * wins) / total
    
    print(f'{team}\t{perc}')

Overwriting mapper_stats.py


In [75]:
%%writefile reducer_stats.py

import sys
import re

for line in sys.stdin:
    
    try:
        team, stat = line.strip().split('\t', 1)
        team = re.sub('\s', '_', team)
        stat = round(float(stat), 2)
    except:
        continue
        
    print(f'{stat}\t{team}')

Overwriting reducer_stats.py


In [79]:
%%bash

INPUT='/data/football/'
OUTPUT='football_matches_'$(date +'%s')
OUTPUT_FINAL='football_stats_'$(date +'%s')

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapreduce.job.name='football_matches' \
    -D mapreduce.job.reduces=8 \
    -files mapper.py,reducer.py \
    -mapper "python3 mapper.py" \
    -reducer "python3 reducer.py" \
    -input ${INPUT} \
    -output ${OUTPUT} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapreduce.job.name='football_stats' \
    -D mapreduce.job.reduces=1 \
    -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
    -D mapreduce.partition.keycomparator.options="-nr" \
    -files mapper_stats.py,reducer_stats.py \
    -mapper "python3 mapper_stats.py" \
    -reducer "python3 reducer_stats.py" \
    -input ${OUTPUT} \
    -output ${OUTPUT_FINAL} > /dev/null

22/10/29 17:14:54 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/10/29 17:14:55 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/10/29 17:14:55 INFO mapred.FileInputFormat: Total input files to process : 1
22/10/29 17:14:55 INFO mapreduce.JobSubmitter: number of splits:2
22/10/29 17:14:56 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
22/10/29 17:14:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1667060446092_0041
22/10/29 17:14:56 INFO conf.Configuration: resource-types.xml not found
22/10/29 17:14:56 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
22/10/29 17:14:56 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
22/10/29 17:14:56 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
22/10/29 17:14:56 INFO impl.Ya

In [80]:
!hdfs dfs -cat /user/jovyan/football_stats_1667063693/part-* | head -n 100

0.38	1899_HOFFENHEIM
0.43	ZENIT_SAINT_PETERSBURG
0.43	ZAMORA
0.32	ZARAGOZA
0.33	ZULTE-WAREGEM
0.0	ZFC_MEUSELWITZ
0.22	ZORYA_LUHANSK
0.22	YORK
0.0	YATE_TOWN
0.0	YTRAC
0.0	YVETOT_AC
0.5	YZEURE_AS
0.33	YECLANO_DEPORTIVO
0.4	YEOVIL
0.5	YEADING
0.0	XEREZ_DEPORTIVO
0.3	XEREZ
0.38	WYCOMBE
0.31	WIGAN
0.0	WESTFIELDS
0.25	WALDHOF_MANNHEIM
0.29	WORCESTER
0.55	WASQUEHAL
0.33	WEICHE_FLENSBURG
0.36	WREXHAM
0.17	WEYMOUTH
0.29	WOLFSBERGER_AC
0.25	WESTON_SUPER_MARE
0.4	WERDER_BREMEN_II
0.2	WHITEHAWK
0.5	WARRINGTON_TOWN
0.0	WARE
0.25	WEST_BROM
0.27	WOKING
0.18	WACKER_BURGHAUSEN
0.2	WORMATIA_WORMS
0.32	WOLVERHAMPTON
0.33	WALSALL
0.0	WHITBY
0.34	WEST_HAM
0.29	WATFORD
0.38	WISLA_KRAKOW
0.0	WURZBURGER_FV
0.0	WEALDSTONE
0.0	WACKER_NORDHAUSEN
0.4	WUPPERTALER_SV
0.0	WORKINGTON
0.43	WERDER_BREMEN
0.4	WELLING
0.43	VERSAILLES_78_FC
0.0	VITREENNE
0.62	VITRE
0.33	VIRTUS_FRANCAVILLA
0.44	VIRTUS_ENTELLA
0.0	VILLENAVE
0.43	VILLARREAL
0.36	VILLANOVENSE
0.0	VICTOR