In [1]:
%%help

Magic,Example,Explanation
info,%%info,Outputs session information for the current Livy endpoint.
cleanup,%%cleanup -f,"Deletes all sessions for the current Livy endpoint, including this notebook's session. The force flag is mandatory."
delete,%%delete -f -s 0,Deletes a session by number for the current Livy endpoint. Cannot delete this kernel's session.
logs,%%logs,Outputs the current session's Livy logs.
configure,"%%configure -f {""executorMemory"": ""1000M"", ""executorCores"": 4}",Configure the session creation parameters. The force flag is mandatory if a session has already been  created and the session will be dropped and recreated. Look at Livy's POST /sessions Request Body for a list of valid parameters. Parameters must be passed in as a JSON string.
spark,%%spark -o df df = spark.read.parquet('...,"Executes spark commands.  Parameters:  -o VAR_NAME: The Spark dataframe of name VAR_NAME will be available in the %%local Python context as a  Pandas dataframe with the same name.  -m METHOD: Sample method, either take or sample.  -n MAXROWS: The maximum number of rows of a dataframe that will be pulled from Livy to Jupyter.  If this number is negative, then the number of rows will be unlimited.  -r FRACTION: Fraction used for sampling."
sql,%%sql -o tables -q SHOW TABLES,"Executes a SQL query against the variable sqlContext (Spark v1.x) or spark (Spark v2.x).  Parameters:  -o VAR_NAME: The result of the SQL query will be available in the %%local Python context as a  Pandas dataframe.  -q: The magic will return None instead of the dataframe (no visualization).  -m, -n, -r are the same as the %%spark parameters above."
local,%%local a = 1,All the code in subsequent lines will be executed locally. Code must be valid Python code.


In [2]:
%reload_ext sparkmagic.magics

In [3]:
%%spark config
{"conf": {"spark.jars.packages": "org.postgresql:postgresql:42.2.5"}}

In [4]:
%spark add -s test -l scala -u http://192.168.1.114:8998/ -k

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
22,,spark,idle,,,✔


SparkSession available as 'spark'.


In [25]:
%spark cleanup test

In [12]:
%%local
import julia.magic
julia.magic.load_ipython_extension(get_ipython())

Initializing Julia interpreter. This may take some time...


In [2]:
%%local
ip = get_ipython()

In [41]:
spark.conf.getAll

res1: Map[String,String] = Map(spark.yarn.dist.archives -> file:/opt/apache-spark/R/lib/sparkr.zip#sparkr, spark.driver.host -> localhost, spark.livy.spark_major_version -> 2, spark.driver.port -> 39303, spark.repl.class.uri -> spark://localhost:39303/classes, spark.jars -> file:///opt/apache-livy/rsc-jars/livy-api-0.5.0-incubating.jar,file:///opt/apache-livy/rsc-jars/netty-all-4.0.37.Final.jar,file:///opt/apache-livy/rsc-jars/livy-rsc-0.5.0-incubating.jar,file:///opt/apache-livy/repl_2.11-jars/commons-codec-1.9.jar,file:///opt/apache-livy/repl_2.11-jars/livy-core_2.11-0.5.0-incubating.jar,file:///opt/apache-livy/repl_2.11-jars/livy-repl_2.11-0.5.0-incubating.jar,file:///var/lib/apache-livy/.ivy2/jars/org.postgresql_postgresql-42.2.5.jar, spark.repl.class.outputDir -> /tmp/spark88490917...

In [5]:
object spark_read_db {
    val opts = Map("url" -> "jdbc:postgresql://localhost:5733/j3", "driver" -> "org.postgresql.Driver")
    def table(dbtable:String) =
        spark.read.format("jdbc").options(opts + ("dbtable" -> dbtable))
}

defined object spark_read_db


In [8]:
val items = spark_read_db.table("items").load
items.printSchema
items.select("tag").distinct().show()
val matches = spark_read_db.table("match_3c.matches").load
matches.printSchema
val scores = spark_read_db.table("scores").load
scores.printSchema
val role_kungfus = spark_read_db.table("role_kungfus").load
role_kungfus.printSchema
val roles = spark_read_db.table("roles").load
roles.printSchema
val match_roles = spark_read_db.table("match_3c.match_roles").load
match_roles.printSchema

items: org.apache.spark.sql.DataFrame = [tag: string, id: string ... 3 more fields]
root
 |-- tag: string (nullable = true)
 |-- id: string (nullable = true)
 |-- content: string (nullable = true)
 |-- inserted_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)

+------------+
|         tag|
+------------+
|       equip|
|metric_names|
|      talent|
|  attr_names|
|     version|
|      kungfu|
+------------+

matches: org.apache.spark.sql.DataFrame = [match_id: bigint, start_time: int ... 11 more fields]
root
 |-- match_id: long (nullable = true)
 |-- start_time: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- pvp_type: integer (nullable = true)
 |-- map: integer (nullable = true)
 |-- grade: integer (nullable = true)
 |-- team1_score: integer (nullable = true)
 |-- team2_score: integer (nullable = true)
 |-- team1_kungfu: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- team2_kungfu: array (nullable = true

In [35]:
%%spark -o df
val df = matches.groupBy("grade").
  agg(
    count("match_id").alias("count"),
    bround(avg("duration"), 2).alias("duration")).
  sort("grade")

df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [grade: int, count: bigint ... 1 more field]


In [36]:
%%local
df

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [15]:
%%spark -o kungfu_items
val kungfu_cn_map = Map(
    "xisui" -> ("洗髓", "洗"),
    "yijin" -> ("易经", "秃"),
    "zixia" -> ("紫霞", "气"),
    "taixu" -> ("太虚", "剑"),
    "huajian" -> ("花间", "花"),
    "aoxue" -> ("傲血", "策"),
    "lijing" -> ("离经", "离"),
    "tielao" -> ("铁牢", "铁"),
    "yunshang" -> ("云裳", "秀"),
    "bingxin" -> ("冰心", "冰"),
    "cangjian" -> ("藏剑", "藏"),
    "dujing" -> ("毒经", "毒"),
    "butian" -> ("补天", "补"),
    "jingyu" -> ("惊羽", "鲸"),
    "tianluo" -> ("天罗", "螺"),
    "fenying" -> ("焚影", "明"),
    "mingzun" -> ("明尊", "喵"),
    "xiaochen" -> ("笑尘", "丐"),
    "tiegu" -> ("铁骨", "骨"),
    "fenshan" -> ("分山", "苍"),
    "mowen" -> ("莫问", "莫"),
    "xiangzhi" -> ("相知", "歌"),
    "beiao" -> ("北傲", "霸")
)
val kungfu_items = items.filter("tag == 'kungfu'").
    select($"id".cast("int"), trim($"content", "\"").alias("content")).
    join(kungfu_cn_map.toList.map { case (a, (b, c)) => (a,b,c) }.toDF("content", "kungfu", "short"), Seq("content"), "left_outer")
kungfu_items.show(100)

kungfu_cn_map: scala.collection.immutable.Map[String,(String, String)] = Map(jingyu -> (惊羽,鲸), yunshang -> (云裳,秀), tiegu -> (铁骨,骨), lijing -> (离经,离), xisui -> (洗髓,洗), fenying -> (焚影,明), xiangzhi -> (相知,歌), taixu -> (太虚,剑), yijin -> (易经,秃), mingzun -> (明尊,喵), cangjian -> (藏剑,藏), tielao -> (铁牢,铁), aoxue -> (傲血,策), fenshan -> (分山,苍), mowen -> (莫问,莫), xiaochen -> (笑尘,丐), butian -> (补天,补), zixia -> (紫霞,气), beiao -> (北傲,霸), bingxin -> (冰心,冰), dujing -> (毒经,毒), tianluo -> (天罗,螺), huajian -> (花间,花))
kungfu_items: org.apache.spark.sql.DataFrame = [content: string, id: int ... 2 more fields]
+--------+-----+------+-----+
| content|   id|kungfu|short|
+--------+-----+------+-----+
|   xisui|10002|    洗髓|    洗|
|   yijin|10003|    易经|    秃|
|   zixia|10014|    紫霞|    气|
|   taixu|10015|    太虚|    剑|
| huajian|10021|    花间|    花|
|   aoxue|10026|    傲血|    策|
|  lijing|10028|    离经|    离|
|  tielao|10062|    铁牢|    铁|
|yunshang|10080|    云裳|    秀|
| bingxin|10081|    冰心|    冰|
|cangjian|10145|    藏

In [45]:
import org.apache.spark.sql.DataFrame
import scala.collection.JavaConverters._
def get_kungfus(matches:DataFrame) = {
    matches.selectExpr("team1_kungfu as kungfu", "winner=1 as won").
        union(matches.selectExpr("team2_kungfu as kungfu", "winner=2 as won"))
}
object kungfu_sort_fun {
    val kungfu_order = Seq(
        "taixu",
        "zixia", "bingxin", "fenying", "yijin", "huajian", "dujing", "tianluo",  "mowen",
        "aoxue", "fenshan", "cangjian", "jingyu", "xiaochen", "beiao",
        "xisui", "tielao", "tiegu", "mingzun",
        "lijing", "yunshang", "butian", "xiangzhi"
    ).zipWithIndex.toMap
    val kungfu_order_sp = Set(("jingyu", "tianluo"))
    def call(a:String, b:String) : Boolean = {
        if (kungfu_order_sp.contains((a, b))) {
            return true
        }
        if (kungfu_order_sp.contains((b, a))) {
            return false
        }
        return kungfu_order(a) < kungfu_order(b)
    }
}
def show_kungfu(kungfus:DataFrame, n:Int) = {
    val kungfu_map = kungfu_items.select("id", "content").takeAsList(100).asScala.map(i => (i.getAs[Int](0), i.getAs[String](1))).toMap
    val kungfu_short_map = kungfu_items.select("content", "short").takeAsList(100).asScala.map(i => (i.getAs[String](0), i.getAs[String](1))).toMap
    kungfus.takeAsList(n).asScala.map(
        i => (
            i.getAs[Seq[Int]](0).map(kungfu_map).sortWith(kungfu_sort_fun.call).map(kungfu_short_map).mkString,
            i.getAs[Long](1)+1,
            i.getAs[Long](2),
            i.getAs[Long](2).toFloat/i.getAs[Long](1)
        )
    )
}

import org.apache.spark.sql.DataFrame
import scala.collection.JavaConverters._
get_kungfus: (matches: org.apache.spark.sql.DataFrame)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
defined object kungfu_sort_fun
show_kungfu: (kungfus: org.apache.spark.sql.DataFrame, n: Int)scala.collection.mutable.Buffer[(String, Long, Long, Float)]


In [34]:
def kungfu_sort(grade:Int) =
    get_kungfus(matches.filter($"grade" === grade)).groupBy("kungfu").
    agg(
        count(lit(1)).alias("count"),
        count(when($"won" === true, true)).alias("won")
    ).sort($"won".desc);
val kungfu_sort_12_result = show_kungfu(kungfu_sort(12), 1000)
val kungfu_sort_13_result = show_kungfu(kungfu_sort(13), 1000)

kungfu_sort: (grade: Int)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
kungfu_sort_12_result: scala.collection.mutable.Buffer[(String, Long, Long, Float)] = ArrayBuffer((策藏秀,73808,38945,0.52765995), (鲸丐秀,40488,23150,0.5717885), (剑藏秀,39843,22674,0.56909794), (气花离,36285,20414,0.5626171), (剑霸秀,32083,18247,0.5687613), (策藏补,32202,16019,0.497469), (毒毒补,26732,15320,0.5731174), (剑霸补,28847,15037,0.5212855), (策鲸秀,27111,14613,0.5390262), (明莫补,24493,14242,0.581496), (策霸秀,29065,14213,0.48902422), (策霸补,27228,13346,0.4901752), (气花补,20359,11201,0.5502014), (剑丐秀,21118,11180,0.5294313), (剑藏补,20990,10766,0.51293534), (秃螺补,17195,10226,0.59474236), (剑策秀,18069,9348,0.5173788), (苍霸补,17435,8990,0.51565903), (秃霸补,15332,8534,0.5566499), (策苍补,16664,8063,0.48388645), (策苍秀,16161,7826,0.48428217), (苍藏秀,15268,7637,0.50022924), (策鲸补,15821,7460,0.471555), (冰毒补,13250,7357,0.5552872), (毒莫补,...kungfu_sort_13_result: scala.collection.mutable.Buffer[(String, Long, Long, Float)] = ArrayBuffer((鲸丐秀,18952,10080,0.531

In [10]:
kungfu_sort_13_result.sortBy(-_._2).take(100).foreach(println)

(策藏秀,19332,9611,0.4971807)
(鲸丐秀,18946,10076,0.53185534)
(剑藏秀,16919,9002,0.53209597)
(剑霸秀,14089,7610,0.54017603)
(毒毒补,13927,8064,0.57906073)
(气花离,11196,6023,0.53800803)
(秃螺补,10226,5972,0.5840587)
(明莫补,9029,5077,0.56236154)
(策鲸秀,7494,3495,0.46643534)
(剑霸补,7493,3765,0.50253606)
(剑丐秀,6971,3469,0.49770445)
(秃霸补,6933,3862,0.55712634)
(毒螺补,5013,2740,0.54668796)
(气花补,4420,2318,0.52455306)
(剑藏补,4136,1879,0.45441353)
(冰毒补,4095,2229,0.5444553)
(冰鲸秀,4052,2357,0.58183163)
(冰莫歌,3864,2401,0.6215377)
(毒莫补,3747,1969,0.5256273)
(明莫歌,3746,2084,0.5564753)
(剑策秀,3746,1759,0.46969292)
(苍霸补,3616,1982,0.5482711)
(气花秀,3257,1816,0.55773956)
(气鲸离,3186,1739,0.54599684)
(策藏补,2908,1283,0.44134846)
(策霸秀,2893,1208,0.41770402)
(秃秃补,2875,1646,0.57272094)
(策霸补,2789,1290,0.46269727)
(秃螺秀,2763,1482,0.5365677)
(藏鲸秀,2759,1476,0.53517044)
(冰鲸离,2698,1436,0.53244346)
(剑气离,2547,1286,0.50510603)
(苍藏秀,2375,1189,0.50084245)
(气明秀,2197,1359,0.61885244)
(秃霸秀,2156,1103,0.51183295)
(苍霸秀,2117,1062,0.50189036)
(剑丐补,1943,848,0.43666324)
(明

In [11]:
import org.apache.spark.sql.DataFrame
object Helpers {
implicit class DataFrameHist(df:DataFrame) {
    def mkHistogram(n_bins: Int) = {
        val (startValues, counts) = df.rdd.map(_.getInt(0)).histogram(n_bins)
        (startValues.toList, counts.toList)
    }
}
}
import Helpers._

import org.apache.spark.sql.DataFrame
defined object Helpers
import Helpers._


In [12]:
scores.filter($"match_type"==="3c").filter($"score">2000).select("score").mkHistogram(10)

res10: (List[Double], List[Long]) = (List(2001.0, 2098.6, 2196.2, 2293.8, 2391.4, 2489.0, 2586.6, 2684.2, 2781.8, 2879.4, 2977.0),List(22807, 42267, 43341, 38937, 14724, 2774, 957, 281, 65, 14))


In [47]:
%%spark -o df_ranking_number
val df_ranking_number = scores.filter($"match_type"==="3c").filter($"ranking"<0).groupBy("ranking").count()

df_ranking_number: org.apache.spark.sql.DataFrame = [ranking: int, count: bigint]


In [48]:
%%local
df_ranking_number

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [58]:
%%spark -o kungfu_weight_10
val roles_fileterd_10 = scores.filter($"match_type"==="3c").filter("score>2100").join(role_kungfus.withColumnRenamed("total_count", "kungfu_count"), Seq("role_id", "match_type"))
val kungfu_weight_10 = roles_fileterd_10.select("role_id", "kungfu", "kungfu_count", "total_count").withColumn("kungfu_weight", $"kungfu_count"/$"total_count").groupBy("kungfu").agg(bround(sum("kungfu_weight"), 2).alias("weight")).sort($"weight".desc)

roles_fileterd_10: org.apache.spark.sql.DataFrame = [role_id: string, match_type: string ... 21 more fields]
kungfu_weight_10: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [kungfu: string, weight: double]


In [57]:
%%spark -o kungfu_weight_9
val roles_fileterd_9 = scores.filter($"match_type"==="3c").filter("score>2000").join(role_kungfus.withColumnRenamed("total_count", "kungfu_count"), Seq("role_id", "match_type"))
val kungfu_weight_9 = roles_fileterd_9.select("role_id", "kungfu", "kungfu_count", "total_count").withColumn("kungfu_weight", $"kungfu_count"/$"total_count").groupBy("kungfu").agg(bround(sum("kungfu_weight"), 2).alias("weight")).sort($"weight".desc)

roles_fileterd_9: org.apache.spark.sql.DataFrame = [role_id: string, match_type: string ... 21 more fields]
kungfu_weight_9: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [kungfu: string, weight: double]


In [56]:
%%spark -o kungfu_weight
val roles_fileterd = scores.filter($"match_type"==="3c").filter("score>2400").join(role_kungfus.withColumnRenamed("total_count", "kungfu_count"), Seq("role_id", "match_type"))
val kungfu_weight = roles_fileterd.select("role_id", "kungfu", "kungfu_count", "total_count").withColumn("kungfu_weight", $"kungfu_count"/$"total_count").groupBy("kungfu").agg(bround(sum("kungfu_weight"), 2).alias("weight")).sort($"weight".desc)

roles_fileterd: org.apache.spark.sql.DataFrame = [role_id: string, match_type: string ... 21 more fields]
kungfu_weight: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [kungfu: string, weight: double]


In [55]:
%%spark -o kungfu_weight_3000
val roles_fileterd_3000 = scores.filter($"match_type"==="3c").filter("ranking>=0").join(role_kungfus.withColumnRenamed("total_count", "kungfu_count"), Seq("role_id", "match_type"))
val kungfu_weight_3000 = roles_fileterd_3000.select("role_id", "kungfu", "kungfu_count", "total_count").withColumn("kungfu_weight", $"kungfu_count"/$"total_count").groupBy("kungfu").agg(bround(sum("kungfu_weight"), 2).alias("weight")).sort($"weight".desc)

roles_fileterd_3000: org.apache.spark.sql.DataFrame = [role_id: string, match_type: string ... 21 more fields]
kungfu_weight_3000: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [kungfu: string, weight: double]


In [29]:
%%local
kungfu_cn_map = {i['content']: i['kungfu'] for i in kungfu_items[["content", "kungfu"]].to_dict('records')}

In [59]:
%%local
kungfu_weight["weight"].sum(), kungfu_weight_10["weight"].sum(), kungfu_weight_9["weight"].sum(), kungfu_weight_3000["weight"].sum()

(14572.620000000003, 95175.95000000001, 112727.84, 2573.19)

In [60]:
%%local
kungfu_weight.assign(kungfu_cn=[kungfu_cn_map[x] for x in kungfu_weight['kungfu']])

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [64]:
%%local
kungfu_weight_3000.assign(kungfu_cn=[kungfu_cn_map[x] for x in kungfu_weight_3000['kungfu']])

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [61]:
%%local
kungfu_weight_10.assign(kungfu_cn=[kungfu_cn_map[x] for x in kungfu_weight_10['kungfu']])

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [62]:
%%local
kungfu_weight_9.assign(kungfu_cn=[kungfu_cn_map[x] for x in kungfu_weight_9['kungfu']])

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()