In [152]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, broadcast
spark = SparkSession.builder.appName("SparkHomework").getOrCreate()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")


25/07/02 19:35:06 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [153]:
medals_df = spark.read.option("header", "true").csv("/home/iceberg/data/medals.csv")
maps_df = spark.read.option("header", "true").csv("/home/iceberg/data/maps.csv")
matches_df = spark.read.option("header", "true").csv("/home/iceberg/data/matches.csv")
match_details_df = spark.read.option("header", "true").csv("/home/iceberg/data/match_details.csv").alias("match_details")
medals_matches_players_df = spark.read.option("header", "true").csv("/home/iceberg/data/medals_matches_players.csv")

In [154]:
match_details_df.show(1)

+--------------------+---------------+---------------------+------------+-----------------+--------+-----------------+------------------------+------------+---------------------------------+-----------------+----------------+-----------------------+-----------+--------------------------------+----------------+-------------------+---------------+-------------------+------------------+----------------------+--------------------------+-------------------------+------------------------+-------------------------+---------------------------+-------------------------------+--------------------------------+---------------------------+--------------------------------+-------------------------------+-------------------+--------------------+--------------------------+-------+-------+
|            match_id|player_gamertag|previous_spartan_rank|spartan_rank|previous_total_xp|total_xp|previous_csr_tier|previous_csr_designation|previous_csr|previous_csr_percent_to_next_tier|previous_csr_rank|current_

In [155]:
df_broadcast = medals_matches_players_df \
        .join(broadcast(medals_df), on="medal_id", how="left") \
        .join(broadcast(matches_df), on="match_id", how="left") \
        .join(broadcast(maps_df), on="mapid", how="left")

In [156]:
medals_df.select('medal_id','name').show()

+----------+--------------+
|  medal_id|          name|
+----------+--------------+
|2315448068|          NULL|
|3565441934|          NULL|
|4162659350| Buzzer Beater|
|1573153198|    Vanquisher|
| 298813630|Spartan Charge|
|3824002610|  Ghost Assist|
|3324603383|    Grunt Kill|
| 979431049|       Bifecta|
|3098362934|  Perfect Kill|
|2435743433|  Base Defense|
|2430242797| Killing Spree|
|1427531503| Team Takedown|
|2359847435|    Extinction|
|1691836029|   Hard Target|
|2766284219|     Quickdraw|
|3354395650| Capture Spree|
|2564994165|Big Gun Runner|
|2896365521| Team Takedown|
|3786961025|      Fastball|
| 243900335|  Shotgun Kill|
+----------+--------------+
only showing top 20 rows



In [157]:
match_details_df.show(1)

+--------------------+---------------+---------------------+------------+-----------------+--------+-----------------+------------------------+------------+---------------------------------+-----------------+----------------+-----------------------+-----------+--------------------------------+----------------+-------------------+---------------+-------------------+------------------+----------------------+--------------------------+-------------------------+------------------------+-------------------------+---------------------------+-------------------------------+--------------------------------+---------------------------+--------------------------------+-------------------------------+-------------------+--------------------+--------------------------+-------+-------+
|            match_id|player_gamertag|previous_spartan_rank|spartan_rank|previous_total_xp|total_xp|previous_csr_tier|previous_csr_designation|previous_csr|previous_csr_percent_to_next_tier|previous_csr_rank|current_

In [175]:
%%sql

CREATE DATABASE IF NOT EXISTS bootcamp

In [182]:
match_details_df.sortWithinPartitions(col("match_id")).write.mode("overwrite").bucketBy(16, 'match_id').saveAsTable("bootcamp.match_details_bucketed")
matches_df.sortWithinPartitions(col("match_id")).write.mode("overwrite").bucketBy(16, 'match_id').saveAsTable("bootcamp.matches_bucketed")
medals_matches_players_df.sortWithinPartitions(col("match_id")).write.mode("overwrite").bucketBy(16, 'match_id').saveAsTable("bootcamp.medals_matches_players_bucketed")

                                                                                

In [199]:
df_bucket = spark.sql("""
select
    match_id,
    medal_id,
    player_gamertag,
    playlist_id,
    player_total_kills,
    mapid,
    completion_date
    
from bootcamp.match_details_bucketed detail
left join bootcamp.matches_bucketed metches using(match_id)
left join bootcamp.medals_matches_players_bucketed medals_metches using(match_id,player_gamertag)
""")

In [200]:
df_bucket.createOrReplaceTempView("player_match_bucket")

In [201]:
df_bucket.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [match_id#6571, medal_id#6619, player_gamertag#6572, playlist_id#6610, player_total_kills#6590, mapid#6608, completion_date#6613]
   +- SortMergeJoin [match_id#6571, player_gamertag#6572], [match_id#6617, player_gamertag#6618], LeftOuter
      :- Sort [match_id#6571 ASC NULLS FIRST, player_gamertag#6572 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(match_id#6571, player_gamertag#6572, 200), ENSURE_REQUIREMENTS, [plan_id=52242]
      :     +- Project [match_id#6571, player_gamertag#6572, player_total_kills#6590, mapid#6608, playlist_id#6610, completion_date#6613]
      :        +- SortMergeJoin [match_id#6571], [match_id#6607], LeftOuter
      :           :- Sort [match_id#6571 ASC NULLS FIRST], false, 0
      :           :  +- Exchange hashpartitioning(match_id#6571, 200), ENSURE_REQUIREMENTS, [plan_id=52233]
      :           :     +- BatchScan demo.bootcamp.match_details_bucketed[match_id#6571, play

In [202]:
spark.sql("""
with avg_kills as (
select 
    player_gamertag,
    avg(player_total_kills) as avg_kills
from player_match_bucket
group by 1
)
select
    player_gamertag,
    avg_kills
from avg_kills
order by avg_kills desc limit 1
""").show()

[Stage 2293:>                                                       (0 + 4) / 4]

+---------------+---------+
|player_gamertag|avg_kills|
+---------------+---------+
|   gimpinator14|    109.0|
+---------------+---------+



                                                                                

In [203]:
spark.sql("""
with group_cte as (
select
    distinct
    match_id,
    playlist_id
from player_match_bucket
)
select
    playlist_id,
    count(1) played_playlist
from group_cte
group by 1
order by 2 desc
limit 1
""").show()

+--------------------+---------------+
|         playlist_id|played_playlist|
+--------------------+---------------+
|f72e0ef0-7c4a-430...|           7657|
+--------------------+---------------+



In [204]:
spark.sql("""
with dedup as (
select
    match_id,
    mapid,
    row_number() over (partition by match_id, mapid order by completion_date) as r_
from player_match_bucket
)
select
    mapid,
    count(1) as plyed_count
from dedup where r_ = 1
group by 1
order by 2 desc
limit 1
""").show()



+--------------------+-----------+
|               mapid|plyed_count|
+--------------------+-----------+
|c7edbf0f-f206-11e...|       7049|
+--------------------+-----------+



                                                                                

In [205]:
spark.sql("""
select
    mapid,
    count(1) as killing_spree_count
from player_match_bucket
where medal_id = '2430242797' /* killing spree */
group by 1
order by 2 desc
limit 1
""").show()

25/07/02 20:25:27 WARN DataSourceV2Strategy: Can't translate true to source filter, unsupported expression
[Stage 2343:>                                                       (0 + 4) / 4]

+--------------------+-------------------+
|               mapid|killing_spree_count|
+--------------------+-------------------+
|c7edbf0f-f206-11e...|               6553|
+--------------------+-------------------+



                                                                                

In [None]:
df_bucket.sortWithinPartitions(col('match_id')).write.mode('overwrite').saveAsTable('bootcamp.joined_dataset_0')
df_bucket.sortWithinPartitions(col('match_id'), col('mapid')).write.mode('overwrite').saveAsTable('bootcamp.joined_dataset_1')
df_bucket.sortWithinPartitions(col('match_id'), col('mapid'), col('playlist_id')).write.mode('overwrite').saveAsTable('bootcamp.joined_dataset_2')

In [231]:
%%sql
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'version_0'
FROM bootcamp.joined_dataset_0.files
UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'version_1'
FROM bootcamp.joined_dataset_1.files
UNION ALL
SELECT SUM(file_size_in_bytes) as size, COUNT(1) as num_files, 'version_2'
FROM bootcamp.joined_dataset_2.files


Py4JJavaError: An error occurred while calling o36.sql.
: org.apache.iceberg.exceptions.ServiceFailureException: Server error: NotFoundException: Location does not exist: s3://warehouse/bootcamp/joined_dataset_2/metadata/00000-174d7849-4b61-4c1d-926d-59106cbc8051.metadata.json
	at org.apache.iceberg.rest.ErrorHandlers$DefaultErrorHandler.accept(ErrorHandlers.java:217)
	at org.apache.iceberg.rest.ErrorHandlers$TableErrorHandler.accept(ErrorHandlers.java:118)
	at org.apache.iceberg.rest.ErrorHandlers$TableErrorHandler.accept(ErrorHandlers.java:102)
	at org.apache.iceberg.rest.HTTPClient.throwFailure(HTTPClient.java:224)
	at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:308)
	at org.apache.iceberg.rest.BaseHTTPClient.get(BaseHTTPClient.java:77)
	at org.apache.iceberg.rest.RESTClient.get(RESTClient.java:97)
	at org.apache.iceberg.rest.RESTSessionCatalog.loadInternal(RESTSessionCatalog.java:465)
	at org.apache.iceberg.rest.RESTSessionCatalog.loadTable(RESTSessionCatalog.java:489)
	at org.apache.iceberg.catalog.BaseSessionCatalog$AsCatalog.loadTable(BaseSessionCatalog.java:99)
	at org.apache.iceberg.rest.RESTCatalog.loadTable(RESTCatalog.java:102)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
	at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1916)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
	at org.apache.iceberg.CachingCatalog.loadTable(CachingCatalog.java:147)
	at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:844)
	at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:169)
	at org.apache.spark.sql.connector.catalog.CatalogV2Util$.getTable(CatalogV2Util.scala:363)
	at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:337)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$resolveRelation$5(Analyzer.scala:1315)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$resolveRelation$1(Analyzer.scala:1311)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$resolveRelation(Analyzer.scala:1296)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$14.applyOrElse(Analyzer.scala:1153)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$14.applyOrElse(Analyzer.scala:1117)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1216)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1215)
	at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:71)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
	at scala.collection.immutable.List.map(List.scala:297)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:699)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1117)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1076)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:240)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:236)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:187)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:236)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:202)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:223)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:222)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
	at jdk.internal.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
