In [124]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType, BooleanType, ArrayType, DateType, TimestampNTZType

In [2]:
APP_NAME = "JSON_DF"

In [14]:
spark = SparkSession.builder.appName(APP_NAME) \
        .config("spark.default.parallelism", "8") \
        .master("local[8]") \
        .getOrCreate()

In [16]:
spark

In [15]:
spark.sparkContext.defaultParallelism

8

In [151]:
df_schema = StructType(
    [
        StructField("abstract", StringType(), True),
        StructField("authors", StringType(), True),
        StructField("authors_parsed", ArrayType(ArrayType(StringType())), True),
        StructField("categories", StringType(), True),
        StructField("comments", StringType(), True),
        StructField("doi", StringType(), True),
        StructField("id", StringType(), True),
        StructField("journal", StringType(), True),
        StructField("license", StringType(), True),
        StructField("report", StringType(), True),
        StructField("submitter", StringType(), True),
        StructField("title", StringType(), True),
        StructField("update_date", DateType(), True),
        StructField("element", StringType(), True),
        StructField("created", StringType(), True),
        StructField("versions", ArrayType(
                                StructType([
                                    StructField("created", StringType(), True),
                                    StructField("version", StringType(), True)
                                ])
        ), True),
    ]
)

In [67]:
df_infer = spark.read.json("/home/guilhermefmk/Documentos/labs_spark/data/arxivData.json")

                                                                                

In [153]:
df = spark.read.json("/home/guilhermefmk/Documentos/labs_spark/data/arxivData.json", schema=df_schema)

In [157]:
df.select(df['versions'].version).show(truncate=False)

+----------------+
|versions.version|
+----------------+
|[v1, v2]        |
|[v1, v2]        |
|[v1, v2, v3]    |
|[v1]            |
|[v1]            |
|[v1]            |
|[v1, v2]        |
|[v1, v2, v3]    |
|[v1]            |
|[v1]            |
|[v1, v2, v3]    |
|[v1]            |
|[v1, v2]        |
|[v1]            |
|[v1, v2]        |
|[v1]            |
|[v1]            |
|[v1, v2]        |
|[v1, v2]        |
|[v1]            |
+----------------+
only showing top 20 rows



In [158]:
df.printSchema()

root
 |-- abstract: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- authors_parsed: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- categories: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- id: string (nullable = true)
 |-- journal: string (nullable = true)
 |-- license: string (nullable = true)
 |-- report: string (nullable = true)
 |-- submitter: string (nullable = true)
 |-- title: string (nullable = true)
 |-- update_date: date (nullable = true)
 |-- element: string (nullable = true)
 |-- created: string (nullable = true)
 |-- versions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |    |-- version: string (nullable = true)



In [88]:

df.show()

+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+---------+-------+--------------------+------+------------------+--------------------+-----------+--------------------+-------+-------+-------+
|            abstract|             authors|      authors_parsed|       categories|            comments|                 doi|       id|journal|             license|report|         submitter|               title|update_date|            versions|element|created|version|
+--------------------+--------------------+--------------------+-----------------+--------------------+--------------------+---------+-------+--------------------+------+------------------+--------------------+-----------+--------------------+-------+-------+-------+
|  A fully differe...|C. Bal\'azs, E. L...|[[Balázs, C., ], ...|           hep-ph|37 pages, 15 figu...|10.1103/PhysRevD....|0704.0001|   NULL|                NULL|  NULL|    Pavel Nadolsky|Calcula

In [159]:
df = df.fillna("unknown", subset=["license"])

In [160]:
df = df.dropna(subset=["comments"])

In [48]:
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+-------+--------------------+------+------------------+--------------------+-----------+--------------------+-------+-------+-------+
|            abstract|             authors|      authors_parsed|          categories|            comments|                 doi|       id|journal|             license|report|         submitter|               title|update_date|            versions|element|created|version|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+-------+--------------------+------+------------------+--------------------+-----------+--------------------+-------+-------+-------+
|  A fully differe...|C. Bal\'azs, E. L...|[[Balázs, C., ], ...|              hep-ph|37 pages, 15 figu...|10.1103/PhysRevD....|0704.0001|   NULL|             unknown|  NULL|    Pavel Nado

In [53]:
df.where(df["categories"].ilike('%math%')).show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+-------+--------------------+------+--------------------+--------------------+-----------+--------------------+-------+-------+-------+
|            abstract|             authors|      authors_parsed|          categories|            comments|                 doi|       id|journal|             license|report|           submitter|               title|update_date|            versions|element|created|version|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+-------+--------------------+------+--------------------+--------------------+-----------+--------------------+-------+-------+-------+
|  We describe a n...|Ileana Streinu an...|[[Streinu, Ileana...|       math.CO cs.CG|To appear in Grap...|                NULL|0704.0002|   NULL|http://arxiv.org/...|  NULL|        

In [73]:
df.createOrReplaceTempView("books")

In [222]:
spark.sql(""" 
    SELECT 
            comments
    FROM
          BOOKS
    WHERE
          categories ILIKE '%math%'
""").show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|comments                                                                                                                                                                                                                                                                                                                                              |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [142]:
df.select(df["versions"]).show(truncate=False)

+---------------------------------------------------------------------------------------------------------------+
|versions                                                                                                       |
+---------------------------------------------------------------------------------------------------------------+
|[{Mon, 2 Apr 2007 19:18:42 GMT, v1}, {Tue, 24 Jul 2007 20:10:27 GMT, v2}]                                      |
|[{Sat, 31 Mar 2007 02:26:18 GMT, v1}, {Sat, 13 Dec 2008 17:26:00 GMT, v2}]                                     |
|[{Sun, 1 Apr 2007 20:46:54 GMT, v1}, {Sat, 8 Dec 2007 23:47:24 GMT, v2}, {Sun, 13 Jan 2008 00:36:28 GMT, v3}]  |
|[{Sat, 31 Mar 2007 03:16:14 GMT, v1}]                                                                          |
|[{Sat, 31 Mar 2007 04:24:59 GMT, v1}]                                                                          |
|[{Sat, 31 Mar 2007 04:27:22 GMT, v1}, {Wed, 22 Aug 2007 22:42:11 GMT, v2}]             

In [92]:
from pyspark.sql.functions import length, avg, col

In [93]:
df.where(length(df["abstract"])>5).count()

                                                                                

1567634

In [77]:
spark.sql("""
    SELECT
          COUNT(1)
    FROM
          books
    WHERE
          CHAR_LENGTH(abstract)>5
""").show()



+--------+
|count(1)|
+--------+
| 1567634|
+--------+



                                                                                

In [64]:
spark.sql("""
    SELECT
          DISTINCT(license)
    FROM
          books
    WHERE
          abstract REGEXP '%\(([A-Za-z][^_/\\<>]{5,})\)%'
""").show()



+--------------------+
|             license|
+--------------------+
|http://arxiv.org/...|
|             unknown|
|http://creativeco...|
|http://creativeco...|
|http://creativeco...|
|http://creativeco...|
|http://creativeco...|
|http://creativeco...|
+--------------------+



                                                                                

In [202]:
from pyspark.sql.functions import avg, col, regexp_extract, when, mean ,lit, filter, transform, struct, to_timestamp, regexp_replace, element_at


In [141]:
df.select(df['versions']).show(truncate=False)

+---------------------------------------------------------------------------------------------------------------+
|versions                                                                                                       |
+---------------------------------------------------------------------------------------------------------------+
|[{Mon, 2 Apr 2007 19:18:42 GMT, v1}, {Tue, 24 Jul 2007 20:10:27 GMT, v2}]                                      |
|[{Sat, 31 Mar 2007 02:26:18 GMT, v1}, {Sat, 13 Dec 2008 17:26:00 GMT, v2}]                                     |
|[{Sun, 1 Apr 2007 20:46:54 GMT, v1}, {Sat, 8 Dec 2007 23:47:24 GMT, v2}, {Sun, 13 Jan 2008 00:36:28 GMT, v3}]  |
|[{Sat, 31 Mar 2007 03:16:14 GMT, v1}]                                                                          |
|[{Sat, 31 Mar 2007 04:24:59 GMT, v1}]                                                                          |
|[{Sat, 31 Mar 2007 04:27:22 GMT, v1}, {Wed, 22 Aug 2007 22:42:11 GMT, v2}]             

In [225]:
df.select(
    "comments",
    regexp_extract("comments", r'(\d+)\s*pages',0)
).show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------+
|comments                                                                                                                                                                                                                                                                                                                   |regexp_extract(comments, (\d+)\s*pages, 0)|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [237]:
df_avg = df.select(
    when(
        regexp_extract("comments", r'(\d+)\s*pages', 1) == lit(""), 
        0
    ).otherwise(
        regexp_extract("comments", r'(\d+)\s*pages', 1)
    ).cast("int").alias("number_of_pages"),
    "comments",
    element_at(filter(
        "versions",
        lambda v: v["version"] == "v1"
    ), 1).alias("version_filter")
    )

df_avg.groupBy(regexp_extract(df_avg.version_filter.created, "^([^,]+)", 1).alias("month_day")).agg(mean("number_of_pages")).show()



+---------+--------------------+
|month_day|avg(number_of_pages)|
+---------+--------------------+
|      Sun|  13.596806792783736|
|      Mon|  13.775644641277932|
|      Thu|   13.68858755347439|
|      Sat|  13.252786575540368|
|      Wed|   13.60628228710771|
|      Tue|  13.604387705619963|
|      Fri|  13.546319933878106|
+---------+--------------------+



                                                                                

In [236]:
df_avg.show()

+---------------+--------------------+--------------------+
|number_of_pages|            comments|      version_filter|
+---------------+--------------------+--------------------+
|             37|37 pages, 15 figu...|{Mon, 2 Apr 2007 ...|
|              0|To appear in Grap...|{Sat, 31 Mar 2007...|
|             23| 23 pages, 3 figures|{Sun, 1 Apr 2007 ...|
|             11|            11 pages|{Sat, 31 Mar 2007...|
|              6|6 pages, 4 figure...|{Sat, 31 Mar 2007...|
|             16|16 pages, no figu...|{Sat, 31 Mar 2007...|
|              0|   Minor corrections|{Sat, 31 Mar 2007...|
|             36|36 pages, 17 figures|{Sat, 31 Mar 2007...|
|             14|14 pages; title c...|{Sat, 31 Mar 2007...|
|             18|  18 pages, 1 figure|{Sun, 1 Apr 2007 ...|
|             22|22 pages; signs a...|{Mon, 2 Apr 2007 ...|
|             17|17 pages, 3 figur...|{Sat, 31 Mar 2007...|
|             10|10 pages, 11 figu...|{Sat, 31 Mar 2007...|
|             20|20 pages, v2: an ...|{M

In [None]:
df_avg.groupBy("month_day").agg(mean("number_of_pages")).show()


In [208]:
df_avg.show()

+---------------+--------------------+--------------------+
|number_of_pages|            comments|      version_filter|
+---------------+--------------------+--------------------+
|             37|37 pages, 15 figu...|{Mon, 2 Apr 2007 ...|
|              0|To appear in Grap...|{Sat, 31 Mar 2007...|
|             23| 23 pages, 3 figures|{Sun, 1 Apr 2007 ...|
|             11|            11 pages|{Sat, 31 Mar 2007...|
|              6|6 pages, 4 figure...|{Sat, 31 Mar 2007...|
|             16|16 pages, no figu...|{Sat, 31 Mar 2007...|
|              0|   Minor corrections|{Sat, 31 Mar 2007...|
|             36|36 pages, 17 figures|{Sat, 31 Mar 2007...|
|             14|14 pages; title c...|{Sat, 31 Mar 2007...|
|             18|  18 pages, 1 figure|{Sun, 1 Apr 2007 ...|
|             22|22 pages; signs a...|{Mon, 2 Apr 2007 ...|
|             17|17 pages, 3 figur...|{Sat, 31 Mar 2007...|
|             10|10 pages, 11 figu...|{Sat, 31 Mar 2007...|
|             20|20 pages, v2: an ...|{M

In [213]:
from pyspark.sql import functions as F

# Filtrar apenas a "versão 1" dos documentos
df_version1 = df.withColumn("version_1", F.expr("filter(versions, v -> v.version = 'v1')")) \
    .withColumn("version_1_created", F.expr("version_1[0].created"))

df_version1 = df_version1.withColumn(
    "created_date",
    F.trim(F.split(F.col("version_1_created"), ",")[0])
)

# Extrair o número de páginas do campo `comments` usando expressão regular
# A regex "\d+" captura a primeira sequência de dígitos antes da palavra "pages"
df_version1 = df_version1.withColumn(
    "num_pages",
    F.regexp_extract(F.col("comments"), r"(\d+)\s*pages", 1).cast("integer")
)

# Agrupar por data e calcular a média de páginas por dia
df_avg_pages_per_day = df_version1.groupBy("created_date") \
    .agg(F.avg("num_pages").alias("average_pages_per_day"))

# Exibir o resultado
df_avg_pages_per_day.show()



+------------+---------------------+
|created_date|average_pages_per_day|
+------------+---------------------+
|         Sun|   18.438584024595393|
|         Mon|    17.98149385325062|
|         Thu|    17.79319656963583|
|         Sat|    17.91937010713588|
|         Wed|    17.82846668144208|
|         Tue|   17.811519809804267|
|         Fri|   17.651086789289675|
+------------+---------------------+



                                                                                

In [186]:
df_avg.select(
    mean("number_of_pages")
).groupBy()



+--------------------+
|avg(number_of_pages)|
+--------------------+
|   325.5782004125984|
+--------------------+



                                                                                

In [179]:
df_avg.printSchema()

root
 |-- number_of_pages: integer (nullable = true)
 |-- version_filter: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |    |-- version: string (nullable = true)



In [78]:
df.printSchema()

root
 |-- abstract: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- authors_parsed: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- categories: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- id: string (nullable = true)
 |-- journal: string (nullable = true)
 |-- license: string (nullable = false)
 |-- report: string (nullable = true)
 |-- submitter: string (nullable = true)
 |-- title: string (nullable = true)
 |-- update_date: date (nullable = true)
 |-- versions: string (nullable = true)
 |-- element: string (nullable = true)
 |-- created: string (nullable = true)
 |-- version: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |    |-- version: string (nullable = true)



In [32]:
df_infer.printSchema()

root
 |-- abstract: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- authors_parsed: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- categories: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- id: string (nullable = true)
 |-- journal-ref: string (nullable = true)
 |-- license: string (nullable = true)
 |-- report-no: string (nullable = true)
 |-- submitter: string (nullable = true)
 |-- title: string (nullable = true)
 |-- update_date: string (nullable = true)
 |-- versions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- created: string (nullable = true)
 |    |    |-- version: string (nullable = true)



In [20]:
df.rdd.getNumPartitions()

25