### Чем плох RDD

1. Данные рассматриваются как текстовые файлы (или бинарники), разделённые по строкам. Про структуру данных ничего не знаем.
  * Каждая задача начинается с парсинга структуры (см. код парсинга с прошлго семинара).
  * Иногда нужно работать со столбцами, а не хранить все строки.
2. Неудобно. Хочется что-то похожее на SQL и / или pandas. "Заставляет аналитиков быть разработчиками" (Д. Лахвич)

### Dataframes
* Похожи на DF в pandas или SQL.
* Работают поверх RDD.

##### Создаем Spark сессию

`.getOrCreate()` - сессия (также как и SparkContext) в рамках приложения существует как Singleton.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Spark DF practice').master('yarn').getOrCreate()

Если работаем с DF, используем SparkSession

In [2]:
spark

Если нужно RDD - используем SparkContext. Он находится внутри SparkSession.

In [1]:
spark.sparkContext  # или просто `sc`

## Создание DF

### Читаем данные из источника

In [2]:
! hdfs dfs -cat /data/users_csv/u.user | head -5

1|24|M|technician|85711
2|53|F|other|94043
3|23|M|writer|32067
4|24|M|technician|43537
5|33|F|other|15213


In [3]:
! hdfs dfs -du -h /data/users_csv/u.user

22.1 K  44.2 K  /data/users_csv/u.user


In [4]:
%%time
df = spark.read.format("csv")\
          .option("sep", "|")\
          .load("/data/users_csv/u.user")

CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 4.28 s


`options` очень много. Подробнее см. [здесь](https://spark.apache.org/docs/2.4.4/api/java/org/apache/spark/sql/DataFrameReader.html#option-java.lang.String-java.lang.String-) для каждого формата.

Почему код так долго выполняется? Почему по логике так быть не должно?

In [5]:
df # видим струтуру

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string]

In [6]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)



Действительно ли там строки? Проверим.

In [7]:
df.show(4)

+---+---+---+----------+-----+
|_c0|_c1|_c2|       _c3|  _c4|
+---+---+---+----------+-----+
|  1| 24|  M|technician|85711|
|  2| 53|  F|     other|94043|
|  3| 23|  M|    writer|32067|
|  4| 24|  M|technician|43537|
+---+---+---+----------+-----+
only showing top 4 rows



Есть числа (порядковый номер, возраст и ID),  пол (boolean). Надо задать схему.

In [8]:
# Для каждого поля пишем название, тип и ещё можно указать Nullable
from pyspark.sql.types import *
schema = StructType(fields=[
    StructField("user_id", IntegerType()),
    StructField("age", IntegerType()),
    StructField("gender", StringType()),
    StructField("occupation", StringType()),
    StructField("zip", IntegerType())
])

In [9]:
%%time
df = spark.read.format("csv")\
          .schema(schema)\
          .option("sep", "|")\
          .load("/data/users_csv/u.user")

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 18.9 ms


In [10]:
df

DataFrame[user_id: int, age: int, gender: string, occupation: string, zip: int]

Видим схему. Без задания схемы, Spark сам пытается вывести типы, но делает это долго и не оч. хорошо.

In [11]:
df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- zip: integer (nullable = true)



Есть трансформация `summary`, кот. аналогично pandas, выводит статистику. Это трансформация => для работы нужно вызвать action (show)

In [12]:
df.summary().show(10)

+-------+-----------------+-----------------+------+-------------+------------------+
|summary|          user_id|              age|gender|   occupation|               zip|
+-------+-----------------+-----------------+------+-------------+------------------+
|  count|              925|              925|   925|          925|               925|
|   mean|470.2908108108108|34.06054054054054|  null|         null| 50868.78810810811|
| stddev|272.1030147185632|12.25807489536592|  null|         null|30891.373254138176|
|    min|                1|                7|     F|administrator|                 0|
|    25%|              236|               25|  null|         null|             21227|
|    50%|              469|               31|  null|         null|             53711|
|    75%|              705|               43|  null|         null|             78741|
|    max|              943|               73|     M|       writer|             99835|
+-------+-----------------+-----------------+------+--

### Преобразование из RDD

In [13]:
#RDD разделители внутри строк не видит, нужно парсить
rdd = sc.textFile("/data/users_csv/u.user").map(lambda x: x.split("|"))

In [14]:
rdd.take(5)

[['1', '24', 'M', 'technician', '85711'],
 ['2', '53', 'F', 'other', '94043'],
 ['3', '23', 'M', 'writer', '32067'],
 ['4', '24', 'M', 'technician', '43537'],
 ['5', '33', 'F', 'other', '15213']]

In [15]:
%%time
#Конвертируем в DF
df = spark.createDataFrame(rdd, schema=schema)

CPU times: user 164 ms, sys: 20 ms, total: 184 ms
Wall time: 385 ms


In [16]:
df.show(10)

Py4JJavaError: An error occurred while calling o98.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 8, mipt-node06.atp-fivt.org, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/worker.py", line 377, in main
    process()
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/sql/session.py", line 730, in prepare
    verify_func(obj)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/sql/types.py", line 1389, in verify
    verify_value(obj)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/sql/types.py", line 1370, in verify_struct
    verifier(v)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/sql/types.py", line 1389, in verify
    verify_value(obj)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/sql/types.py", line 1315, in verify_integer
    verify_acceptable_types(obj)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/sql/types.py", line 1278, in verify_acceptable_types
    % (dataType, obj, type(obj))))
TypeError: field user_id: IntegerType can not accept object '1' in type <class 'str'>

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/worker.py", line 377, in main
    process()
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/sql/session.py", line 730, in prepare
    verify_func(obj)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/sql/types.py", line 1389, in verify
    verify_value(obj)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/sql/types.py", line 1370, in verify_struct
    verifier(v)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/sql/types.py", line 1389, in verify
    verify_value(obj)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/sql/types.py", line 1315, in verify_integer
    verify_acceptable_types(obj)
  File "/opt/cloudera/parcels/SPARK2-2.4.0.cloudera2-1.cdh5.13.3.p0.1041012/lib/spark2/python/pyspark/sql/types.py", line 1278, in verify_acceptable_types
    % (dataType, obj, type(obj))))
TypeError: field user_id: IntegerType can not accept object '1' in type <class 'str'>

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [17]:
# Отключаем проверку схемы
df = spark.createDataFrame(rdd, schema=schema, verifySchema=False)

In [18]:
df.show(5)

+-------+----+------+----------+----+
|user_id| age|gender|occupation| zip|
+-------+----+------+----------+----+
|   null|null|     M|technician|null|
|   null|null|     F|     other|null|
|   null|null|     M|    writer|null|
|   null|null|     M|technician|null|
|   null|null|     F|     other|null|
+-------+----+------+----------+----+
only showing top 5 rows



Ошибок нет... и данных тоже. Что делать? Допиливать RDD :(

In [19]:
rdd = rdd.map(lambda x: (int(x[0]), int(x[1]), x[2], x[3], int(x[4])))
df = spark.createDataFrame(rdd, schema=schema)
df.show(5)

+-------+---+------+----------+-----+
|user_id|age|gender|occupation|  zip|
+-------+---+------+----------+-----+
|      1| 24|     M|technician|85711|
|      2| 53|     F|     other|94043|
|      3| 23|     M|    writer|32067|
|      4| 24|     M|technician|43537|
|      5| 33|     F|     other|15213|
+-------+---+------+----------+-----+
only showing top 5 rows



In [20]:
! hdfs dfs -cat /data/user_logs/logsM2.txt | head -5

33.49.147.163	20140101014611	http://news.rambler.ru/3105700	378	431	Safari/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729;)
197.72.248.141	20140101020306	http://news.mail.ru/6344933	1412	203	Safari/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729;
33.49.147.163	20140101023103	http://lenta.ru/4303000	1189	451	Chrome/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)
75.208.40.166	20140101032909	http://newsru.com/3330815	60	306	Safari/5.0 (Windows; U; MSIE 9.0; Windows NT 8.1; Trident/5.0; .NET4.0E; en-AU)
197.72.248.141	20140101033626	http://newsru.com/1588977	736	307	Chrome/5.0 (compatible; MSIE 9.0; Windows NT 8.0; WOW64; Trident/5.0; .NET CLR 2.7.40781; .NET4.0E; en-SG)
cat: Unable to write to output stream.


In [21]:
log_schema = StructType(fields=[
    StructField("ip", StringType()),
    StructField("timestamp", LongType()),
    StructField("url", StringType()),
    StructField("size", IntegerType()),
    StructField("code", IntegerType()),
    StructField("ua", StringType())
])

In [22]:
log_df = spark.read.csv("/data/user_logs/logsM2.txt", sep="\t", schema=log_schema)

In [23]:
log_df.show(5, truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------------------------------------------------------
 ip        | 33.49.147.163                                                                                                 
 timestamp | 20140101014611                                                                                                
 url       | http://news.rambler.ru/3105700                                                                                
 size      | 378                                                                                                           
 code      | 431                                                                                                           
 ua        | Safari/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729;)               
-RECORD 1------------------------------------------------------------------------------------------------------------------
 ip     

In [24]:
log_df.rdd.getNumPartitions() # файл 1, значит и партиция одна

1

In [25]:
log_df = log_df.repartition(4) # Увеличим число партиций
log_df.rdd.getNumPartitions()

4

### Простые действия над DF
Вспоминаем реляционку:
 * Проекция (`SELECT`) - подмножество столбцов
 * Фильтр (`WHERE`, `HAVING`) - подмножество строк.

In [26]:
log_df.schema.fieldNames()

['ip', 'timestamp', 'url', 'size', 'code', 'ua']

#### Проекция

In [27]:
log_df.select(["ip", "timestamp", "url"]).show(5)

+---------------+--------------+--------------------+
|             ip|     timestamp|                 url|
+---------------+--------------+--------------------+
| 197.72.248.141|20140313153714|http://lenta.ru/9...|
|  75.208.40.166|20140210083843|http://news.rambl...|
| 222.131.187.37|20140425170839|http://lenta.ru/2...|
|   49.203.96.67|20140225042119|http://news.yande...|
|135.124.143.193|20140313110843|http://news.mail....|
+---------------+--------------+--------------------+
only showing top 5 rows



In [28]:
log_df.select(*log_df.schema.fieldNames()[:3])

DataFrame[ip: string, timestamp: bigint, url: string]

In [29]:
log_df.select("ip", "timestamp", "url")

DataFrame[ip: string, timestamp: bigint, url: string]

Это всё трансформации. На выходе получаем DF. 

In [30]:
log_df.where("code == 200").show(5)

+--------------+--------------+--------------------+----+----+--------------------+
|            ip|     timestamp|                 url|size|code|                  ua|
+--------------+--------------+--------------------+----+----+--------------------+
| 75.208.40.166|20140326084231|http://news.yande...| 955| 200|Opera/5.0 (Window...|
|197.72.248.141|20140404122749|http://newsru.com...| 884| 200|Opera/5.0 (compat...|
| 75.208.40.166|20140216075757|http://news.rambl...|1337| 200|Firefox/5.0 (comp...|
|168.255.93.197|20140213033139|http://news.yande...|1004| 200|Chrome/5.0 (Windo...|
|197.72.248.141|20140403130026|http://news.yande...| 856| 200|Opera/5.0 (compat...|
+--------------+--------------+--------------------+----+----+--------------------+
only showing top 5 rows



In [31]:
log_df.filter(log_df.code == 200).show(5)

+--------------+--------------+--------------------+----+----+--------------------+
|            ip|     timestamp|                 url|size|code|                  ua|
+--------------+--------------+--------------------+----+----+--------------------+
| 75.208.40.166|20140326084231|http://news.yande...| 955| 200|Opera/5.0 (Window...|
|197.72.248.141|20140404122749|http://newsru.com...| 884| 200|Opera/5.0 (compat...|
| 75.208.40.166|20140216075757|http://news.rambl...|1337| 200|Firefox/5.0 (comp...|
|168.255.93.197|20140213033139|http://news.yande...|1004| 200|Chrome/5.0 (Windo...|
|197.72.248.141|20140403130026|http://news.yande...| 856| 200|Opera/5.0 (compat...|
+--------------+--------------+--------------------+----+----+--------------------+
only showing top 5 rows



In [32]:
log_df.filter("code == 200 AND url LIKE '%rambler%'").show(5)

+-------------+--------------+--------------------+----+----+--------------------+
|           ip|     timestamp|                 url|size|code|                  ua|
+-------------+--------------+--------------------+----+----+--------------------+
|33.49.147.163|20140317090245|http://news.rambl...| 166| 200|Safari/5.0 (Windo...|
|75.208.40.166|20140310132240|http://news.rambl...|1107| 200|Opera/5.0 (compat...|
|33.49.147.163|20140330043133|http://news.rambl...| 945| 200|Chrome/5.0 (compa...|
|33.49.147.163|20140329134621|http://news.rambl...|1593| 200|Firefox/5.0 (comp...|
|33.49.147.163|20140106045941|http://news.rambl...| 671| 200|Firefox/5.0 (comp...|
+-------------+--------------+--------------------+----+----+--------------------+
only showing top 5 rows



In [33]:
log_df.filter(log_df.code.isin([200, 399]) & log_df.url.like("%rambler%")).show(5)

+-------------+--------------+--------------------+----+----+--------------------+
|           ip|     timestamp|                 url|size|code|                  ua|
+-------------+--------------+--------------------+----+----+--------------------+
|33.49.147.163|20140317090245|http://news.rambl...| 166| 200|Safari/5.0 (Windo...|
|75.208.40.166|20140310132240|http://news.rambl...|1107| 200|Opera/5.0 (compat...|
|33.49.147.163|20140330043133|http://news.rambl...| 945| 200|Chrome/5.0 (compa...|
|33.49.147.163|20140329134621|http://news.rambl...|1593| 200|Firefox/5.0 (comp...|
|33.49.147.163|20140106045941|http://news.rambl...| 671| 200|Firefox/5.0 (comp...|
+-------------+--------------+--------------------+----+----+--------------------+
only showing top 5 rows



In [34]:
log_df.select(log_df.ip, log_df.code.alias("response_code")).show(5)

+---------------+-------------+
|             ip|response_code|
+---------------+-------------+
|   25.62.10.220|          306|
|   49.105.15.79|          101|
|   49.105.15.79|          101|
|135.124.143.193|          303|
|135.124.143.193|          303|
+---------------+-------------+
only showing top 5 rows



In [40]:
import pyspark.sql.functions as f
log_df.select("ip", f.col("code").alias("response_code")).show(5)

+---------------+-------------+
|             ip|response_code|
+---------------+-------------+
|   25.62.10.220|          306|
|   49.105.15.79|          101|
|   49.105.15.79|          101|
|135.124.143.193|          303|
|135.124.143.193|          303|
+---------------+-------------+
only showing top 5 rows



In [35]:
log_df[log_df.ip, "code"].show(5)

+---------------+----+
|             ip|code|
+---------------+----+
|   25.62.10.220| 306|
|   49.105.15.79| 101|
|   49.105.15.79| 101|
|135.124.143.193| 303|
|135.124.143.193| 303|
+---------------+----+
only showing top 5 rows



In [36]:
log_df[(log_df.code == 404) & (log_df.url.like("%rambler%"))][log_df.ip, "code"].show(5)

+--------------+----+
|            ip|code|
+--------------+----+
|  3.183.113.77| 404|
|110.91.102.196| 404|
|168.255.93.197| 404|
|197.72.248.141| 404|
|197.72.248.141| 404|
+--------------+----+
only showing top 5 rows



In [37]:
query_str = """
SELECT ip, code FROM log_df
WHERE code == 404 AND url LIKE "%rambler%" 
"""
log_df.registerTempTable("log_df")

In [38]:
spark.sql(query_str).show(5)

+--------------+----+
|            ip|code|
+--------------+----+
|  3.183.113.77| 404|
|110.91.102.196| 404|
|168.255.93.197| 404|
|197.72.248.141| 404|
|197.72.248.141| 404|
+--------------+----+
only showing top 5 rows



In [41]:
log_df.select("ua", f.length("ua").alias("length")).show(5)

+--------------------+------+
|                  ua|length|
+--------------------+------+
|Safari/5.0 (compa...|    95|
|Safari/5.0 (compa...|    95|
|Safari/5.0 (compa...|    95|
|Safari/5.0 (compa...|    95|
|Safari/5.0 (compa...|    95|
+--------------------+------+
only showing top 5 rows



In [42]:
log_df.select(f.concat("url", f.lit("?key=value&key2=val2")).alias('my')).show(5, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------
 my  | http://newsru.com/4608402?key=value&key2=val2      
-RECORD 1-------------------------------------------------
 my  | http://news.yandex.ru/4761980?key=value&key2=val2  
-RECORD 2-------------------------------------------------
 my  | http://lenta.ru/2662415?key=value&key2=val2        
-RECORD 3-------------------------------------------------
 my  | http://news.mail.ru/5550951?key=value&key2=val2    
-RECORD 4-------------------------------------------------
 my  | http://news.rambler.ru/2396720?key=value&key2=val2 
only showing top 5 rows



In [43]:
log_df.select('ua', f.split('ua', ' ')).show(2, vertical=True, truncate=False)

-RECORD 0------------------------------------------------------------------------------------------------------------------
 ua           | Safari/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; chromeframe/12.0.742.112)            
 split(ua,  ) | [Safari/5.0, (compatible;, MSIE, 9.0;, Windows, NT, 6.1;, WOW64;, Trident/5.0;, chromeframe/12.0.742.112)] 
-RECORD 1------------------------------------------------------------------------------------------------------------------
 ua           | Safari/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; chromeframe/12.0.742.112)            
 split(ua,  ) | [Safari/5.0, (compatible;, MSIE, 9.0;, Windows, NT, 6.1;, WOW64;, Trident/5.0;, chromeframe/12.0.742.112)] 
only showing top 2 rows



In [44]:
log_df.select('ua', f.split('ua', ' ').alias('parsed')).select(f.col('parsed')[0], f.col('parsed')[8]).show(5)

+----------+------------+
| parsed[0]|   parsed[8]|
+----------+------------+
|Safari/5.0|Trident/5.0;|
|Safari/5.0|Trident/5.0;|
|Safari/5.0|Trident/5.0;|
|Safari/5.0|Trident/5.0;|
|Safari/5.0|Trident/5.0;|
+----------+------------+
only showing top 5 rows



In [45]:
log_df.select('ua', f.split('ua', ' ').alias('parsed')).select(f.explode('parsed').alias('parsed_exploded'))\
    .groupby('parsed_exploded').count().orderBy("count").show(5)

+--------------------+-----+
|     parsed_exploded|count|
+--------------------+-----+
|          3.0.30729;|  962|
|          3.5.30729;|  962|
|        Trident/5.0)|  971|
|chromeframe/12.0....|  983|
|                 en)| 1004|
+--------------------+-----+
only showing top 5 rows



In [46]:
! hdfs dfs -cat /data/user_logs/ip_data_M/ipDataM.txt | head

49.105.15.79	Komi
110.91.102.196	Chelyabinsk Oblast
56.167.169.126	Saint Petersburg
75.208.40.166	Ulyanovsk Oblast
168.255.93.197	Irkutsk Oblast
75.208.40.166	Arkhangelsk Oblast
110.91.102.196	Zabaykalsky Krai
75.208.40.166	Tyumen Oblast
56.167.169.126	Tomsk Oblast
75.208.40.166	Sakha
cat: Unable to write to output stream.


In [47]:
! hdfs dfs -du -h /data/user_logs/ip_data_M/ipDataM.txt

239.3 K  478.6 K  /data/user_logs/ip_data_M/ipDataM.txt


In [48]:
ip_schema = StructType(fields=[
    StructField('ip', StringType()),
    StructField('region', StringType())
])

In [49]:
ips = spark.read.csv("/data/user_logs/ip_data_M/ipDataM.txt", sep="\t", schema=ip_schema)

In [50]:
ips.rdd.getNumPartitions()

1

In [51]:
ips.show(5)

+--------------+------------------+
|            ip|            region|
+--------------+------------------+
|  49.105.15.79|              Komi|
|110.91.102.196|Chelyabinsk Oblast|
|56.167.169.126|  Saint Petersburg|
| 75.208.40.166|  Ulyanovsk Oblast|
|168.255.93.197|    Irkutsk Oblast|
+--------------+------------------+
only showing top 5 rows



In [52]:
spark.sql("SET spark.sql.autoBroadcastJoinThreshold = 100")
logs_with_reg = log_df.join(ips, on='ip', how='inner')

In [53]:
logs_with_reg.explain()

== Physical Plan ==
*(5) Project [ip#505, timestamp#506L, url#507, size#508, code#509, ua#510, region#801]
+- *(5) SortMergeJoin [ip#505], [ip#800], Inner
   :- *(2) Sort [ip#505 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(ip#505, 200)
   :     +- Exchange RoundRobinPartitioning(4)
   :        +- *(1) Project [ip#505, timestamp#506L, url#507, size#508, code#509, ua#510]
   :           +- *(1) Filter isnotnull(ip#505)
   :              +- *(1) FileScan csv [ip#505,timestamp#506L,url#507,size#508,code#509,ua#510] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://mipt-master.atp-fivt.org:8020/data/user_logs/logsM2.txt], PartitionFilters: [], PushedFilters: [IsNotNull(ip)], ReadSchema: struct<ip:string,timestamp:bigint,url:string,size:int,code:int,ua:string>
   +- *(4) Sort [ip#800 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(ip#800, 200)
         +- *(3) Project [ip#800, region#801]
            +- *(3) Filter isnotnull(ip#800)
             

In [54]:
logs_with_reg.show(5)

+------------+--------------+--------------------+----+----+--------------------+-------------------+
|          ip|     timestamp|                 url|size|code|                  ua|             region|
+------------+--------------+--------------------+----+----+--------------------+-------------------+
|3.183.113.77|20140126111936|http://newsru.com...|1485| 412|Firefox/5.0 compa...|           Chukotka|
|3.183.113.77|20140126111936|http://newsru.com...|1485| 412|Firefox/5.0 compa...|     Ivanovo Oblast|
|3.183.113.77|20140126111936|http://newsru.com...|1485| 412|Firefox/5.0 compa...|          Tatarstan|
|3.183.113.77|20140126111936|http://newsru.com...|1485| 412|Firefox/5.0 compa...|Karachay–Cherkessia|
|3.183.113.77|20140126111936|http://newsru.com...|1485| 412|Firefox/5.0 compa...|   Yaroslavl Oblast|
+------------+--------------+--------------------+----+----+--------------------+-------------------+
only showing top 5 rows



In [55]:
logs_with_reg.rdd.getNumPartitions()

200

Про `coalesce()` и `repartition()`: [Link](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.coalesce.html)

In [56]:
import pyspark.sql.functions as f
spark.sql("SET spark.sql.autoBroadcastJoinThreshold = 10")
logs_with_reg = log_df.join(f.broadcast(ips), on='ip', how='inner')
logs_with_reg.explain()

== Physical Plan ==
*(3) Project [ip#505, timestamp#506L, url#507, size#508, code#509, ua#510, region#801]
+- *(3) BroadcastHashJoin [ip#505], [ip#800], Inner, BuildRight
   :- Exchange RoundRobinPartitioning(4)
   :  +- *(1) Project [ip#505, timestamp#506L, url#507, size#508, code#509, ua#510]
   :     +- *(1) Filter isnotnull(ip#505)
   :        +- *(1) FileScan csv [ip#505,timestamp#506L,url#507,size#508,code#509,ua#510] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://mipt-master.atp-fivt.org:8020/data/user_logs/logsM2.txt], PartitionFilters: [], PushedFilters: [IsNotNull(ip)], ReadSchema: struct<ip:string,timestamp:bigint,url:string,size:int,code:int,ua:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- *(2) Project [ip#800, region#801]
         +- *(2) Filter isnotnull(ip#800)
            +- *(2) FileScan csv [ip#800,region#801] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://mipt-master.atp-fivt.org:8

In [57]:
log_df.rdd.getNumPartitions()

4

In [58]:
ips.rdd.getNumPartitions()

1

In [59]:
logs_with_reg.rdd.getNumPartitions()

4

In [60]:
logs_with_reg.show(5)

+------------+--------------+--------------------+----+----+--------------------+--------------------+
|          ip|     timestamp|                 url|size|code|                  ua|              region|
+------------+--------------+--------------------+----+----+--------------------+--------------------+
|49.105.15.79|20140127041332|http://lenta.ru/5...| 184| 509|Chrome/5.0 compat...|North Ossetia–Alania|
|49.105.15.79|20140127041332|http://lenta.ru/5...| 184| 509|Chrome/5.0 compat...|            Kalmykia|
|49.105.15.79|20140127041332|http://lenta.ru/5...| 184| 509|Chrome/5.0 compat...|      Bryansk Oblast|
|49.105.15.79|20140127041332|http://lenta.ru/5...| 184| 509|Chrome/5.0 compat...|        Kursk Oblast|
|49.105.15.79|20140127041332|http://lenta.ru/5...| 184| 509|Chrome/5.0 compat...|              Udmurt|
+------------+--------------+--------------------+----+----+--------------------+--------------------+
only showing top 5 rows



In [61]:
logs_with_reg.groupby('region').agg(f.count('ip')).show(5)

+-----------------+---------+
|           region|count(ip)|
+-----------------+---------+
|    Kaluga Oblast|   111297|
|    Ryazan Oblast|    80716|
|  Smolensk Oblast|    99735|
|Sverdlovsk Oblast|    87004|
|          Mari El|    98414|
+-----------------+---------+
only showing top 5 rows



In [62]:
result = logs_with_reg.groupby('region').count().distinct().coalesce(2).withColumnRenamed('count', 'ip_count')
result.rdd.getNumPartitions()
#result.show()

2

In [63]:
result.write.csv("region_counts.tsv", sep='\t', mode='overwrite')

In [64]:
! hdfs dfs -ls region_counts.tsv

Found 3 items
-rw-r--r--   2 mtsion mtsion          0 2021-11-27 19:45 region_counts.tsv/_SUCCESS
-rw-r--r--   2 mtsion mtsion        809 2021-11-27 19:45 region_counts.tsv/part-00000-711fe1f7-2487-491f-9403-5bf5da013ead-c000.csv
-rw-r--r--   2 mtsion mtsion        841 2021-11-27 19:45 region_counts.tsv/part-00001-711fe1f7-2487-491f-9403-5bf5da013ead-c000.csv


In [65]:
# ! hdfs dfs -ls region_counts.tsv
! hdfs dfs -cat region_counts.tsv/part-00001-6da128c0-e084-4ae0-9c37-e603987abf44-c000.csv | head

cat: `region_counts.tsv/part-00001-6da128c0-e084-4ae0-9c37-e603987abf44-c000.csv': No such file or directory


In [None]:
from pyspark.sql import Window
logs_with_reg.select("ip", "timestamp", f.count("*").over(Window.partitionBy('ip')).alias("cnt")).orderBy("cnt").show(5)

## Задачи 

В hdfs в папке `/data/access_logs/big_log` лежит лог в формате

* IP-адрес пользователя (`195.206.123.39`),
* Далее идут два неиспользуемых в нашем случае поля (`-` и `-`),
* Время запроса (`[24/Sep/2015:12:32:53 +0400]`),
* Строка запроса (`"GET /id18222 HTTP/1.1"`),
* HTTP-код ответа (`200`),
* Размер ответа (`10703`),
* Реферер (источник перехода; `"http://bing.com/"`),
* Идентификационная строка браузера (User-Agent; `"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"`).

Созданы несколько семплов данных разного размера:
```
3.4 G    10.2 G   /data/access_logs/big_log
17.6 M   52.7 M   /data/access_logs/big_log_10000
175.4 M  526.2 M  /data/access_logs/big_log_100000
```

In [5]:
import re
import sys
from datetime import datetime as dt
from pyspark.sql import SparkSession, Row

log_format = re.compile(
    r"(?P<host>[\d\.]+)\s"
    r"(?P<identity>\S*)\s"
    r"(?P<user>\S*)\s"
    r"\[(?P<time>.*?)\]\s"
    r'"(?P<request>.*?)"\s'
    r"(?P<status>\d+)\s"
    r"(?P<bytes>\S*)\s"
    r'"(?P<referer>.*?)"\s'
    r'"(?P<user_agent>.*?)"\s*'
)


def parseLine(line):
    match = log_format.match(line)
    if not match:
        return ("", "", "", "", "", "", "", "", "")

    request = match.group('request').split()
    return (match.group('host'), match.group('time').split()[0],
        request[0], request[1], match.group('status'), int(match.group('bytes')),
        match.group('referer'), match.group('user_agent'),
        dt.strptime(match.group('time').split()[0], '%d/%b/%Y:%H:%M:%S').hour)


if __name__ == "__main__":
    spark_session = SparkSession.builder.master("yarn").appName("501 df").config("spark.ui.port", "18089").getOrCreate()
    lines = spark_session.sparkContext.textFile("/data/access_logs/big_log_10000")
    parts = lines.map(parseLine)
    rows = parts.map(lambda p: Row(ip=p[0],
                                   timestamp=p[1],
                                   request_type=p[2],
                                   request_url=p[3],
                                   status=p[4],
                                   bytes=p[5],
                                   referer=p[6],
                                   user_agent=p[7],
                                   hour=p[8]))
    access_log_df = spark_session.createDataFrame(rows)

In [6]:
access_log_df.show(5)

+-----+----+---------------+-------+------------+-----------+------+--------------------+--------------------+
|bytes|hour|             ip|referer|request_type|request_url|status|           timestamp|          user_agent|
+-----+----+---------------+-------+------------+-----------+------+--------------------+--------------------+
|27513|   0|109.105.128.100|      -|         GET|   /id45574|   200|10/Dec/2015:00:00:00|Mozilla/5.0 (Wind...|
|11914|   0| 217.146.45.122|      -|         GET|   /id40851|   200|10/Dec/2015:00:00:00|Mozilla/5.0 (X11;...|
|32457|   0|   17.72.78.198|      -|         GET|   /id58931|   200|10/Dec/2015:00:00:00|Mozilla/5.0; TOB ...|
|26190|   0|  46.245.183.68|      -|         GET|   /id19513|   200|10/Dec/2015:00:00:00|Mozilla/5.0 (Wind...|
|14115|   0| 91.197.164.156|      -|         GET|   /id39028|   200|10/Dec/2015:00:00:01|Mozilla/5.0 (X11;...|
+-----+----+---------------+-------+------------+-----------+------+--------------------+--------------------+
o

### Задача 1.
> Напишите программу, выводящую на экран TOP5 ip адресов, в которых содержится хотя бы одна цифра 4, с наибольшим количеством посещений.
Каждая строка результата должна содержать IP адрес и число посещений, разделенные табуляцией, строки должны быть упорядочены по числу посещений по убыванию, например:

```
195.206.123.39<TAB>40
196.206.123.40<TAB>39
191.206.123.41<TAB>38
175.206.123.42<TAB>37
195.236.123.43<TAB>36
```

### Задача 5.

>  Напишите программу, выводящую на экран суммарное распределение количества посетителей по часам (для каждого часа в сутках вывести количество посетителей, пришедших в этот час). Id посетителя = ip + user_agent.
Результат должен содержать час в сутках и число посетителей, разделенные табом и упорядоченные по часам. Например:
```
0<tab>10
1<tab>10
2<tab>10
…..
21<tab>30
22<tab>20
23<tab>10
```

In [7]:
# .drop_duplicates(["ip", "user_agent", "hour"])