In [None]:
import scala.annotation.tailrec

@tailrec
final def gcd(a: Int, b: Int): Int =   
  if (b == 0) a else gcd(b, a % b)

gcd(4, 12)

In [2]:
@tailrec
final def factorial(n: Int, s: Int = 1): Int =
  if (n == 0) s else factorial(n - 1, s * n)

factorial: (n: Int, s: Int)Int


In [3]:
factorial(5)

120

In [4]:
import org.apache.spark.sql.functions._

val spark = SparkSession.builder
.appName("SparkSessionExample")
.getOrCreate

spark = org.apache.spark.sql.SparkSession@97868c3


org.apache.spark.sql.SparkSession@97868c3

# Схема данных

| customer               |                                                                                                                                                       |
|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
| customer_id            | Идентификатор клиента                                                                                                                                 |
| product_X              | Статус продукта. OPN - открыт, но не утилизирован. UTL - утилизирован. CLS - закрыт                                                                   |
| gender_cd              | Пол. M - мужской. F - женский                                                                                                                         |
| age                    | Возраст в годах                                                                                                                                       |
| marital_status_cd      | Семейный статус. См. словарь соответствия                                                                                                             |
| children_cnt           | Количество детей в штуках                                                                                                                             |
| first_session_dttm     | Дата и время первой сессии в приложении или личном кабинете на сайте                                                                                  |
| job_position_cd        | Категория занимаемой должности. См. словарь соответствия                                                                                              |
| job_title              | Занимаемая должность                                                                                                                                  |

| stories_reaction_train |                                                                                                                                                       |
|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
| customer_id            | Идентификатор клиента                                                                                                                                 |
| story_id               | Идентификатор истории                                                                                                                                 |
| event_dttm             | Дата действия                                                                                                                                         |
| event                  | Тип действия. like - лайк или сохранение. view - глубокий просмотр (более 10 секунд). skip - пролистанная история (менее 5 секунд). dislike - дизлайк |

# Читаем CSVшку

In [2]:
import org.apache.spark.rdd._

class SimpleCSVHeader(header:Array[String]) extends Serializable {
  val index = header.zipWithIndex.toMap
  def apply(array:Array[String], key:String) : String = {
      val curIndex = index(key)
      if (curIndex < array.size) {
          return array(curIndex)
      } else {
          return ""
      }
  }
}

val csvCustomer = sc.textFile("Downloads/data_like/customer_train.csv")  // original file
val dataCustomer = csvCustomer.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val headerCustomer = new SimpleCSVHeader(dataCustomer.first()) // we build our header with the first line
val customers = dataCustomer.filter(line => headerCustomer(line,"customer_id") != "customer_id") // filter the header out

val csvStories = sc.textFile("Downloads/data_like/stories_reaction_train.csv")  // original file
val dataStories = csvStories.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val headerStories = new SimpleCSVHeader(dataStories.first()) // we build our header with the first line
val stories = dataStories.filter(line => headerStories(line,"customer_id") != "customer_id") // filter the header out

defined class SimpleCSVHeader
csvCustomer = Downloads/data_like/customer_train.csv MapPartitionsRDD[1] at textFile at <console>:29
dataCustomer = MapPartitionsRDD[2] at map at <console>:30
headerCustomer = SimpleCSVHeader@58814f40
customers = MapPartitionsRDD[3] at filter at <console>:32
csvStories = Downloads/data_like/stories_reaction_train.csv MapPartitionsRDD[5] at textFile at <console>:34
dataStories = MapPartitionsRDD[6] at map at <console>:35
headerStories = SimpleCSVHeader@3f912475


stories: org.apache.spark.rdd.RDD[Array[String]...


SimpleCSVHeader@3f912475

In [3]:
headerCustomer.index

Map(product_1 -> 2, first_session_dttm -> 12, customer_id -> 0, product_6 -> 7, job_title -> 14, product_4 -> 5, job_position_cd -> 13, age -> 9, product_5 -> 6, gender_cd -> 8, product_0 -> 1, product_3 -> 4, product_2 -> 3, children_cnt -> 11, marital_status_cd -> 10)

In [4]:
headerStories.index

Map(customer_id -> 0, story_id -> 1, event_dttm -> 2, event -> 3)

In [5]:
customers.take(1)

Array(Array(894436, "", "", "", "", "", UTL, "", M, 30.0, MAR, 0.0, 2018-03-20 09:10:16, 1, Неруководящий сотрудник - обсл. Персонал))

In [6]:
stories.take(1)

Array(Array(15, 138, 2018-07-24 15:33:22, view))

# Задачи
1. Посчитать количество пользователей по каждому типу marital_status, с помощью reduceByKey
2. Посчитать долю пользователей, у которых утилизирован продукт 2, с помощью aggregate
3. Посчитать количество историй 2018-07-24, которые лайкнули преимущественно мужчины
4. Посчитать количество и средний возраст пользователей, посмотревших историю 645 
5. Посчитать количество утилизированных продуктов у каждого пользователя с помощью flatMap
6. Посчитать количество утилизированных продуктов у каждого пользователя с помощью groupByKey

In [17]:
// 1. Посчитать количество пользователей по каждому типу marital_status, с помощью reduceByKey

val byStatus = customers
    .map(s => (headerCustomer(s, "marital_status_cd"), 1))
    .reduceByKey((a, b) => a + b)

byStatus = ShuffledRDD[19] at reduceByKey at <console>:46


ShuffledRDD[19] at reduceByKey at <console>:46

In [18]:
byStatus.collect()

Array(("",18793), (MAR,14199), (CIV,2240), (UNM,11906), (WID,297), (DLW,1), (DIV,2564))

In [10]:
customers.collect()

Array(Array(894436, "", "", "", "", "", UTL, "", M, 30.0, MAR, 0.0, 2018-03-20 09:10:16, 1, Неруководящий сотрудник - обсл. Персонал), Array(524526, "", UTL, "", "", "", UTL, "", F, 20.0, UNM, 0.0, 2017-03-29 20:38:45, 16), Array(498134, "", UTL, "", "", "", "", "", F, 25.0, UNM, 0.0, 2018-03-12 11:25:06, 22), Array(278941, "", "", UTL, CLS, "", UTL, UTL, M, 25.0, "", "", 2016-02-21 18:47:51, 16, Неруководящий сотрудник - специалист), Array(877312, "", UTL, "", "", "", "", "", F, 40.0, MAR, 0.0, 2018-03-07 11:17:02, 22), Array(821806, "", "", "", UTL, "", UTL, "", F, 25.0, "", 0.0, 2018-01-15 09:23:54, 22), Array(782728, "", UTL, "", "", "", "", "", M, 35.0, MAR, 0.0, 2017-12-26 09:02:58, 22), Array(540071, "", "", "", "", "", UTL, "", M, 30.0, "", 0.0, 201...

In [14]:
// 2. Посчитать долю пользователей, у которых утилизирован продукт 2, с помощью aggregate

// def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U

// Aggregate the elements of each partition, and then the results for all the partitions, 
// using given combine functions and a neutral "zero value". This function can return a 
// different result type, U, than the type of this RDD, T. Thus, we need one operation 
// for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. 
// Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.


val counters = customers
    .map(s => if (headerCustomer(s, "product_2") == "UTL") 1 else 0)
    .aggregate((0,0))(
        (u, t) => (u._1 + t, u._2 + 1),
        (u1, u2) => (u1._1 + u2._1, u1._2 + u2._2)
    )

counters._1.toDouble / counters._2

counters = (1895,50000)


0.0379

In [17]:
"2018-07-24 15:33:22".slice(0,10)

2018-07-24

In [18]:
stories.count

473141

In [31]:
genderRDD.take(5)

Array((894436,M), (524526,F), (498134,F), (278941,M), (877312,F))

In [36]:
// 3. Посчитать количество историй 2018-07-24, которые лайкнули преимущественно мужчины

val genderRDD = customers
    .map(s => (headerCustomer(s, "customer_id"), headerCustomer(s, "gender_cd")))

stories
    .filter(s => (headerStories(s, "event") == "like") && (headerStories(s, "event_dttm").slice(0,10) == "2018-07-24"))
    .map(s => (headerStories(s, "customer_id"), headerStories(s, "story_id"))) // ~215
    .join(genderRDD)
    .map(s => (s._2._1, if (s._2._2 == "M") 1 else 0))
    .aggregateByKey((0,0))(
        (u, t) => (u._1 + t, u._2 + 1),
        (u1, u2) => (u1._1 + u2._1, u1._2 + u2._2)
    )
    .filter(s => s._2._1 > s._2._2 * 0.5)
    .collect()

genderRDD = MapPartitionsRDD[71] at map at <console>:41


Array((499,(1,2)), (419,(6,7)), (1000010,(0,1)), (1000074,(0,1)), (394,(1,1)), (1000087,(0,1)), (426,(3,3)), (354,(10,11)), (1245,(1,1)), (1256,(0,1)), (459,(0,1)), (1340,(1,1)), (1290,(1,1)), (1311,(27,36)), (312,(1,1)), (293,(1,2)), (1362,(0,1)), (1298,(1,1)), (589,(1,1)), (1324,(0,1)), (1155,(1,1)), (314,(2,3)), (1000102,(1,1)), (745,(1,1)), (448,(3,3)), (480,(1,3)), (1344,(2,2)), (1276,(5,6)), (420,(2,3)), (1087,(1,2)), (482,(1,1)), (846,(1,1)), (347,(0,1)), (534,(1,1)), (1355,(3,4)), (1319,(1,1)), (1359,(7,8)), (956,(3,3)), (445,(0,1)), (1365,(0,1)), (382,(1,2)), (1376,(3,3)), (454,(2,2)), (885,(1,1)), (458,(1,1)), (443,(1,1)), (405,(1,2)), (1110...

In [None]:
join((K, V), (K, W)) => (K, (V,W))

In [None]:
// 4. Посчитать количество и средний возраст пользователей, посмотревших историю 645 

In [None]:
// 5. Посчитать количество утилизированных продуктов у каждого пользователя с помощью flatMap

In [None]:
// 6. Посчитать количество утилизированных продуктов у каждого пользователя с помощью groupByKey

## Dataframe API

In [29]:
val customers = spark.read
    .option("header", true)
    .option("inferSchema", true)
    .csv("Downloads/data_like/customer_train.csv")

val stories = spark.read
    .option("header", true)
    .option("inferSchema", true)
    .csv("Downloads/data_like/stories_reaction_train.csv")

customers = [customer_id: int, product_0: string ... 13 more fields]
stories = [customer_id: int, story_id: int ... 2 more fields]


[customer_id: int, story_id: int ... 2 more fields]

In [51]:
// 1. Посчитать количество пользователей по каждому типу marital_status, с помощью reduceByKey

customers
    .groupBy("marital_status_cd")
    .agg(count("*") as "count")
    .show

+-----------------+-----+
|marital_status_cd|count|
+-----------------+-----+
|             null|18793|
|              DLW|    1|
|              WID|  297|
|              UNM|11906|
|              MAR|14199|
|              CIV| 2240|
|              DIV| 2564|
+-----------------+-----+



In [52]:
// 2. Посчитать долю пользователей, у которых утилизирован продукт 2
customers
    .groupBy("product_2")
    .agg(count("*") as "count")
    .show

+---------+-----+
|product_2|count|
+---------+-----+
|     null|45609|
|      UTL| 1895|
|      CLS|  750|
|      OPN| 1746|
+---------+-----+



In [58]:
customers
    .select(
        $"product_2",
        coalesce($"product_2", "NULL") as "product_2_cleaned"
    )
    .show

Name: Unknown Error
Message: <console>:37: error: type mismatch;
 found   : String("NULL")
 required: org.apache.spark.sql.Column
               coalesce($"product_2", "NULL") as "product_2_cleaned"
                                      ^

StackTrace: 

In [64]:
customers
    .rollup(coalesce($"product_2", lit("NULL")) as "product_2_cleaned")
    .agg(count("*") as "count")
    .show

+-----------------+-----+
|product_2_cleaned|count|
+-----------------+-----+
|             NULL|45609|
|             null|50000|
|              UTL| 1895|
|              OPN| 1746|
|              CLS|  750|
+-----------------+-----+



In [66]:
stories.printSchema

root
 |-- customer_id: integer (nullable = true)
 |-- story_id: integer (nullable = true)
 |-- event_dttm: timestamp (nullable = true)
 |-- event: string (nullable = true)



In [71]:
// 3. Посчитать количество историй 2018-07-24, которые лайкнули преимущественно мужчины

val genders = customers
    // .select("customer_id", "gender_cd")

stories
    .filter($"event" === "like" && to_date($"event_dttm","yyyy-MM-dd") === "2018-07-24")
    .join(genders, Seq("customer_id"), "inner")
    .groupBy("story_id")
    .agg(
        count("*") as "total_likes",
        sum(when($"gender_cd" === "M", lit(1)).otherwise(lit(0))) as "mens_likes"
    )
    .filter($"mens_likes" > $"total_likes" * 0.5)
    .show

+--------+-----------+----------+
|story_id|total_likes|mens_likes|
+--------+-----------+----------+
|     458|          1|         1|
| 1000088|          1|         1|
|    1344|          2|         2|
|    1276|          6|         5|
| 1000102|          1|         1|
|    1290|          1|         1|
|    1311|         36|        27|
|    1145|          1|         1|
| 1000109|          2|         2|
|    1155|          1|         1|
|     308|          2|         2|
|     470|          1|         1|
|     692|          1|         1|
|    1245|          1|         1|
|    1355|          4|         3|
|     676|          1|         1|
|     846|          1|         1|
|     959|          1|         1|
|     836|          1|         1|
|     419|          7|         6|
+--------+-----------+----------+
only showing top 20 rows



genders = [customer_id: int, product_0: string ... 13 more fields]


[customer_id: int, product_0: string ... 13 more fields]

Задачи на DataFrame API:

1. Вывести количество пользователей, которые не посмотрели ни одной истории
2. Найти пользователя-мужчину с максимальным количеством просмотров
3. Посчитать количество историй, которые пользователи смотрят с момента первого просмотра
4. Rollup vs Cube: сделать пример аналитического куба
5. Написать 2 задачу на spark sql

In [26]:
sc

org.apache.spark.SparkContext@38166e42

In [27]:
sc.uiWebUrl

Some(http://192.168.1.65:4040)

In [30]:
stories.printSchema
customers.printSchema

root
 |-- customer_id: integer (nullable = true)
 |-- story_id: integer (nullable = true)
 |-- event_dttm: timestamp (nullable = true)
 |-- event: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- product_0: string (nullable = true)
 |-- product_1: string (nullable = true)
 |-- product_2: string (nullable = true)
 |-- product_3: string (nullable = true)
 |-- product_4: string (nullable = true)
 |-- product_5: string (nullable = true)
 |-- product_6: string (nullable = true)
 |-- gender_cd: string (nullable = true)
 |-- age: double (nullable = true)
 |-- marital_status_cd: string (nullable = true)
 |-- children_cnt: double (nullable = true)
 |-- first_session_dttm: timestamp (nullable = true)
 |-- job_position_cd: integer (nullable = true)
 |-- job_title: string (nullable = true)



In [32]:
// 1. Вывести количество пользователей, которые не посмотрели ни одной истории
customers.join(stories.select("customer_id"), Seq("customer_id"), "leftanti").count

12177

In [None]:
val customer_ids = broadcast(stories.select("customer_id").repartition(200))

In [33]:
customers.repartition(200).join(customer_ids, Seq("customer_id"), "leftanti").count

12177

In [34]:
sc.uiWebUrl

Some(http://192.168.1.65:4040)

In [None]:
broadcast

In [35]:
stories.printSchema
customers.printSchema

root
 |-- customer_id: integer (nullable = true)
 |-- story_id: integer (nullable = true)
 |-- event_dttm: timestamp (nullable = true)
 |-- event: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- product_0: string (nullable = true)
 |-- product_1: string (nullable = true)
 |-- product_2: string (nullable = true)
 |-- product_3: string (nullable = true)
 |-- product_4: string (nullable = true)
 |-- product_5: string (nullable = true)
 |-- product_6: string (nullable = true)
 |-- gender_cd: string (nullable = true)
 |-- age: double (nullable = true)
 |-- marital_status_cd: string (nullable = true)
 |-- children_cnt: double (nullable = true)
 |-- first_session_dttm: timestamp (nullable = true)
 |-- job_position_cd: integer (nullable = true)
 |-- job_title: string (nullable = true)



In [45]:
// 2. Найти пользователя-мужчину с максимальным количеством просмотров
val males = customers.filter($"gender_cd" === "M").select("customer_id")

stories
    .join(males, Seq("customer_id"), "inner")
    .groupBy("customer_id")
    .agg(count("*") as "num_previews")
    .orderBy(-$"num_previews")
    .take(1)

males = [customer_id: int]


Array([920088,430])

In [42]:
stories
    .join(males, Seq("customer_id"), "inner")
    .groupBy("customer_id")
    .agg(count("*") as "num_previews")
    .agg(max(struct($"num_previews", $"customer_id")) as "argmax_struct")
    .select("argmax_struct.customer_id")
    .show(1)

+-----------+
|customer_id|
+-----------+
|     920088|
+-----------+



https://sparkbyexamples.com/spark/spark-convert-string-to-timestamp-format/ 

In [24]:
import spark.sqlContext.implicits._

//String to timestamps
val df = Seq(
    ("2019-07-01 12:01:19.000"),
    ("2019-06-24 12:01:19.000"),
    ("2019-11-16 16:44:55.406"),
    ("2019-11-16 16:50:59.406")
  )
  .toDF("input_timestamp")
  .withColumn("datetype_timestamp", to_timestamp(col("input_timestamp")))

df.printSchema()

df.show(false)

root
 |-- input_timestamp: string (nullable = true)
 |-- datetype_timestamp: timestamp (nullable = true)

+-----------------------+-----------------------+
|input_timestamp        |datetype_timestamp     |
+-----------------------+-----------------------+
|2019-07-01 12:01:19.000|2019-07-01 12:01:19    |
|2019-06-24 12:01:19.000|2019-06-24 12:01:19    |
|2019-11-16 16:44:55.406|2019-11-16 16:44:55.406|
|2019-11-16 16:50:59.406|2019-11-16 16:50:59.406|
+-----------------------+-----------------------+



df = [input_timestamp: string, datetype_timestamp: timestamp]


[input_timestamp: string, datetype_timestamp: timestamp]

In [46]:
stories.printSchema
customers.printSchema

root
 |-- customer_id: integer (nullable = true)
 |-- story_id: integer (nullable = true)
 |-- event_dttm: timestamp (nullable = true)
 |-- event: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- product_0: string (nullable = true)
 |-- product_1: string (nullable = true)
 |-- product_2: string (nullable = true)
 |-- product_3: string (nullable = true)
 |-- product_4: string (nullable = true)
 |-- product_5: string (nullable = true)
 |-- product_6: string (nullable = true)
 |-- gender_cd: string (nullable = true)
 |-- age: double (nullable = true)
 |-- marital_status_cd: string (nullable = true)
 |-- children_cnt: double (nullable = true)
 |-- first_session_dttm: timestamp (nullable = true)
 |-- job_position_cd: integer (nullable = true)
 |-- job_title: string (nullable = true)



In [54]:
// 3. Посчитать количество просмотров историй, которые пользователи смотрят за первые 3 дня пользования начиная с первой сессии
val storiesWithTime2 = stories
    .select($"customer_id", to_timestamp($"event_dttm") as "event_dttm")

val joined2 = customers
    .select($"customer_id", to_timestamp($"first_session_dttm") as "first_session_dttm")
    .join(storiesWithTime2, Seq("customer_id"))
    .cache

joined2
    .withColumn("datediff_days", datediff($"event_dttm", $"first_session_dttm"))
    .filter($"datediff_days" <= 3)
    .count

storiesWithTime2 = [customer_id: int, event_dttm: timestamp]
joined2 = [customer_id: int, first_session_dttm: timestamp ... 1 more field]


1847

In [55]:
joined2
    .withColumn("datediff_days", datediff($"event_dttm", $"first_session_dttm"))
    .filter($"datediff_days" <= 6)
    .count

3204

### Window functions

In [59]:
import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._

case class Salary(depName: String, empNo: Long, salary: Long)
val empsalary = Seq(
  Salary("sales", 1, 5000),
  Salary("personnel", 2, 3900),
  Salary("sales", 3, 4800),
  Salary("sales", 4, 4800),
  Salary("personnel", 5, 3500),
  Salary("develop", 7, 4200),
  Salary("develop", 8, 6000),
  Salary("develop", 9, 4500),
  Salary("develop", 10, 5200),
  Salary("develop", 11, 5200)).toDF

empsalary.show

+---------+-----+------+
|  depName|empNo|salary|
+---------+-----+------+
|    sales|    1|  5000|
|personnel|    2|  3900|
|    sales|    3|  4800|
|    sales|    4|  4800|
|personnel|    5|  3500|
|  develop|    7|  4200|
|  develop|    8|  6000|
|  develop|    9|  4500|
|  develop|   10|  5200|
|  develop|   11|  5200|
+---------+-----+------+



defined class Salary
empsalary = [depName: string, empNo: bigint ... 1 more field]


[depName: string, empNo: bigint ... 1 more field]

In [65]:
import org.apache.spark.sql.expressions.Window

empsalary
    .withColumn("avg", avg($"salary").over(Window.partitionBy($"depName")))
    .withColumn("sum_by_empNo", sum($"salary").over(Window.partitionBy($"depName").orderBy("empNo")))
    .show

+---------+-----+------+-----------------+------------+
|  depName|empNo|salary|              avg|sum_by_empNo|
+---------+-----+------+-----------------+------------+
|  develop|    7|  4200|           5020.0|        4200|
|  develop|    8|  6000|           5020.0|       10200|
|  develop|    9|  4500|           5020.0|       14700|
|  develop|   10|  5200|           5020.0|       19900|
|  develop|   11|  5200|           5020.0|       25100|
|    sales|    1|  5000|4866.666666666667|        5000|
|    sales|    3|  4800|4866.666666666667|        9800|
|    sales|    4|  4800|4866.666666666667|       14600|
|personnel|    2|  3900|           3700.0|        3900|
|personnel|    5|  3500|           3700.0|        7400|
+---------+-----+------+-----------------+------------+



In [70]:
stories.show(5)

+-----------+--------+-------------------+-----+
|customer_id|story_id|         event_dttm|event|
+-----------+--------+-------------------+-----+
|         15|     138|2018-07-24 15:33:22| view|
|         15|     202|2018-06-04 08:08:08| skip|
|         15|     222|2018-06-17 13:44:45| skip|
|         15|     379|2018-05-23 05:41:43| skip|
|         15|     544|2018-07-25 02:16:29| view|
+-----------+--------+-------------------+-----+
only showing top 5 rows



In [76]:
// 3a. Посчитать количество историй, которые пользователи смотрят за первые 3 дня пользования начиная с первого просмотра историй

val storiesWithTime = stories
    .select($"customer_id", to_timestamp($"event_dttm") as "event_dttm")
    .withColumn("first_story_dttm", min("event_dttm").over(Window.partitionBy("customer_id")))

    .withColumn("datediff_days", datediff($"event_dttm", $"first_story_dttm"))
    .filter($"datediff_days" <= 3)
    .count

storiesWithTime = 151464


151464

### Cache

df.cache – создаёт кэш для всех вычислений, предшествующих этому

In [None]:
// 4. Rollup vs Cube: пример аналитического куба

In [None]:
// 5. Написать 2 задачу на spark sql

In [None]:
// 2. Найти пользователя-мужчину с максимальным количеством просмотров
val males = customers.filter($"gender_cd" === "M").select("customer_id")

stories
    .join(males, Seq("customer_id"), "inner")
    .groupBy("customer_id")
    .agg(count("*") as "num_previews")
    .orderBy(-$"num_previews")
    .take(1)

In [78]:
stories.createOrReplaceTempView("stories")
customers.createOrReplaceTempView("customers")

In [80]:
spark.sql("""
    select * 
    from stories join customers
    where customers.gender_cd = "M"
    limit 10
""").show

+-----------+--------+-------------------+-----+
|customer_id|story_id|         event_dttm|event|
+-----------+--------+-------------------+-----+
|         15|     138|2018-07-24 15:33:22| view|
|         15|     202|2018-06-04 08:08:08| skip|
|         15|     222|2018-06-17 13:44:45| skip|
|         15|     379|2018-05-23 05:41:43| skip|
|         15|     544|2018-07-25 02:16:29| view|
|         15|     610|2018-07-29 01:49:35| view|
|         15|     645|2018-07-24 15:33:22| view|
|         15|     687|2018-05-23 05:41:43| view|
|         15|     770|2018-06-04 08:08:08| skip|
|         15|     781|2018-06-03 09:24:45| view|
+-----------+--------+-------------------+-----+

