# Using PySpark to anayze and retrieve data from HDFS

In [1]:
import os
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
from pyspark.sql.functions import col, when
from pyspark.sql.functions import *
import hdfs
from io import BytesIO
import fastavro
from pyspark.sql.functions import udf
import time

In [2]:
import re

In [3]:
import findspark
findspark.init("./spark-2.4.4")

In [4]:
sc = SparkContext()
spark = SparkSession.builder.getOrCreate()

In [5]:
start_time = time.time()

In [6]:
# Load files
hdfs_client = hdfs.InsecureClient("http://0.0.0.0:50070")

#################################### Create a Dataframe from .sql files
rdd = sc.binaryFiles('hdfs://localhost:9000/data/article_links/master/full/sous-dataset-1/pagelink*.avro') 

# Parse avro files
nodes = rdd.flatMap(lambda args: fastavro.reader(BytesIO(args[1])))

# Convert to a resilient distributed dataset (RDD) of rows
rows = nodes.map(lambda node: Row(**node))

# Create a schema to define the type of each column
schema_type=StructType([StructField("pl_from", LongType(), False), StructField("pl_title", StringType(),False)])

# Convert to a Spark dataframe
df = spark.createDataFrame(rows, schema_type)

# Cache data to avoid re-computing everything
df.persist()

In [7]:
# le title we want to find authors writing most
TITLE = 'Makémaké'

In [8]:
def blank_as_null(x):
    return when(col(x) != "", col(x)).otherwise(None)

In [9]:
dfWithEmptyReplaced = df.withColumn("pl_title", blank_as_null("pl_title"))


In [10]:
dfNotNull = dfWithEmptyReplaced.filter(dfWithEmptyReplaced["pl_title"].isNotNull())
dfNotNull = dfNotNull.filter((dfNotNull.pl_title != "'!'")&(dfNotNull.pl_title != "'!!'")&(dfNotNull.pl_title != "'!!!'"))
dfNotNull.show(2)

+-------+--------------------+
|pl_from|            pl_title|
+-------+--------------------+
|7661715|'!!!!!!!!_/_Kimi_...|
|7942106|'!!!!!!!!_/_Kimi_...|
+-------+--------------------+
only showing top 2 rows



In [11]:
print((dfNotNull.count(), len(dfNotNull.columns)))

(1512304, 2)


In [12]:
try:
    counter_tag = dfNotNull.filter(dfNotNull.pl_title.contains(TITLE)).count()
    print("\nWe are seeking the number of pages where the title %s is pointed to" %TITLE)
    print("There are %d pages refered to the article" % counter_tag)
except:
    print("\nNo tag founded")


We are seeking the number of pages where the title Makémaké is pointed to
There are 41804 pages refered to the article


In [13]:
pages_pointed_to_title = dfNotNull.filter(dfNotNull.pl_title.contains(TITLE)).select("pl_from").collect()
pages_pointed_to_title =[r.pl_from for r in pages_pointed_to_title]

In [14]:
pages_pointed_to_title[:5]

[11936587, 11936589, 11936591, 11936593, 11936595]

In [15]:
len(pages_pointed_to_title)

41804

In [16]:
from pyspark.sql.functions import udf

In [17]:
F = udf(lambda x: 'pointed' if x in pages_pointed_to_title else x, StringType())
k = dfNotNull.withColumn("new_pl_from",F(dfNotNull["pl_from"]))

In [18]:
k.filter(k.pl_title.contains(TITLE)).show()

+--------+-------------------+-----------+
| pl_from|           pl_title|new_pl_from|
+--------+-------------------+-----------+
|11936587|'(136472)_Makémaké'|    pointed|
|11936589|'(136472)_Makémaké'|    pointed|
|11936591|'(136472)_Makémaké'|    pointed|
|11936593|'(136472)_Makémaké'|    pointed|
|11936595|'(136472)_Makémaké'|    pointed|
|11936597|'(136472)_Makémaké'|    pointed|
|11936599|'(136472)_Makémaké'|    pointed|
|11936601|'(136472)_Makémaké'|    pointed|
|11936603|'(136472)_Makémaké'|    pointed|
|11936605|'(136472)_Makémaké'|    pointed|
|11936607|'(136472)_Makémaké'|    pointed|
|11936609|'(136472)_Makémaké'|    pointed|
|11936611|'(136472)_Makémaké'|    pointed|
|11936613|'(136472)_Makémaké'|    pointed|
|11936615|'(136472)_Makémaké'|    pointed|
|11936617|'(136472)_Makémaké'|    pointed|
|11936619|'(136472)_Makémaké'|    pointed|
|11936621|'(136472)_Makémaké'|    pointed|
|11936623|'(136472)_Makémaké'|    pointed|
|11936625|'(136472)_Makémaké'|    pointed|
+--------+-

In [19]:
def parse(row):
    word1 = " ".join(re.findall("[a-zA-Z]+", row))
    return word1

In [20]:
F1 = udf(parse)

In [21]:
k1 = k.withColumn("pl_title", F1("pl_title"))

In [22]:
k2= k1.filter(k1['pl_title'] != '')

In [23]:
k2.show()

+--------+------------------+-----------+
| pl_from|          pl_title|new_pl_from|
+--------+------------------+-----------+
| 7661715|Kimi to Iu Kasetsu|    7661715|
| 7942106|Kimi to Iu Kasetsu|    7942106|
|  759182|          Fuck You|     759182|
| 3923775|          Fuck You|    3923775|
|  351979|             album|     351979|
|11632317|             album|   11632317|
|  903058|              Bang|     903058|
| 7671720|           Deladap|    7671720|
|  405075|              Huff|     405075|
|  574718|                 K|     574718|
|  713351|                 K|     713351|
| 1155717|                 K|    1155717|
| 1327190|                 K|    1327190|
| 1783586|                 K|    1783586|
| 6114565|                 K|    6114565|
| 6117942|                 K|    6117942|
| 3959135|         K Records|    3959135|
| 4940931|         K Records|    4940931|
| 2977893|         Kai Garib|    2977893|
|  162147|             Karas|     162147|
+--------+------------------+-----

In [24]:
titles_pointed_to_title = k2.filter(k['new_pl_from'] == "pointed").select('pl_title').distinct()

In [25]:
titles_pointed_to_title = titles_pointed_to_title.select(col("pl_title").alias("title"))

In [26]:
titles_pointed_to_title.show(2)

+-------+
|  title|
+-------+
|Tjelvar|
|     XU|
+-------+
only showing top 2 rows



In [27]:
#################################### Create a Dataframe from .xmle files
rdd1 = sc.binaryFiles('hdfs://localhost:9000/data/article_links/master/full/sous-dataset-1/frwiki*.avro') 

# Parse avro files
nodes1 = rdd1.flatMap(lambda args: fastavro.reader(BytesIO(args[1])))

# Convert to a resilient distributed dataset (RDD) of rows
rows1 = nodes1.map(lambda node: Row(**node))

# Create a schema to define the type of each column
schema_type1=StructType([StructField("title", StringType(), False), StructField("contributors", ArrayType(StringType()),False)])


In [28]:
# Convert to a Spark dataframe
df1 = spark.createDataFrame(rows1, schema_type1)
df1.show()

+--------------------+--------------------+
|               title|        contributors|
+--------------------+--------------------+
|     Antoine Meillet|[Curry, script de...|
|    Algèbre linéaire|[curie.noos.net, ...|
|    Algèbre générale|[Youssefsan, Sept...|
|       Algorithmique|[Valéry Beaud, ot...|
|Politique en Arge...|[Youssefsan, Yous...|
|Armée républicain...|[cache7.ihug.com....|
|            Autriche|[Alex, Curry, scr...|
|            Autriche|[DocteurCosmos, S...|
|Arc de triomphe d...|[host213-1-132-15...|
|Arc de triomphe d...|[Celette, Gkml, L...|
|        Arsène Lupin|[Buzz, Valéry Bea...|
|          Algorithme|[Valéry Beaud, sc...|
|  Sigles en médecine|[Elisa, Elisa, El...|
|Aux couleurs du M...|[Valéry Beaud, sc...|
|         Afghanistan|[proxy-4v.club-in...|
|         Afghanistan|[Soig, Soig, Soig...|
|Algèbre de Boole ...|[Mathic, titan.ac...|
|Algèbre de Boole ...|[Erasoft24, Titib...|
|       Ada (langage)|[Valéry Beaud, Po...|
|            Auvergne|[Rinaldum,

In [29]:
print((df1.count(), len(df1.columns)))

(839, 2)


In [30]:
dfWithEmptyReplaced1 = df1.withColumn("title", blank_as_null("title"))
#dfWithEmptyReplaced1.persist()
dfWithEmptyReplaced1.show()

+--------------------+--------------------+
|               title|        contributors|
+--------------------+--------------------+
|     Antoine Meillet|[Curry, script de...|
|    Algèbre linéaire|[curie.noos.net, ...|
|    Algèbre générale|[Youssefsan, Sept...|
|       Algorithmique|[Valéry Beaud, ot...|
|Politique en Arge...|[Youssefsan, Yous...|
|Armée républicain...|[cache7.ihug.com....|
|            Autriche|[Alex, Curry, scr...|
|            Autriche|[DocteurCosmos, S...|
|Arc de triomphe d...|[host213-1-132-15...|
|Arc de triomphe d...|[Celette, Gkml, L...|
|        Arsène Lupin|[Buzz, Valéry Bea...|
|          Algorithme|[Valéry Beaud, sc...|
|  Sigles en médecine|[Elisa, Elisa, El...|
|Aux couleurs du M...|[Valéry Beaud, sc...|
|         Afghanistan|[proxy-4v.club-in...|
|         Afghanistan|[Soig, Soig, Soig...|
|Algèbre de Boole ...|[Mathic, titan.ac...|
|Algèbre de Boole ...|[Erasoft24, Titib...|
|       Ada (langage)|[Valéry Beaud, Po...|
|            Auvergne|[Rinaldum,

In [31]:
dfWithEmptyReplaced1.filter(dfWithEmptyReplaced1["title"].isNotNull()).show()

+--------------------+--------------------+
|               title|        contributors|
+--------------------+--------------------+
|     Antoine Meillet|[Curry, script de...|
|    Algèbre linéaire|[curie.noos.net, ...|
|    Algèbre générale|[Youssefsan, Sept...|
|       Algorithmique|[Valéry Beaud, ot...|
|Politique en Arge...|[Youssefsan, Yous...|
|Armée républicain...|[cache7.ihug.com....|
|            Autriche|[Alex, Curry, scr...|
|            Autriche|[DocteurCosmos, S...|
|Arc de triomphe d...|[host213-1-132-15...|
|Arc de triomphe d...|[Celette, Gkml, L...|
|        Arsène Lupin|[Buzz, Valéry Bea...|
|          Algorithme|[Valéry Beaud, sc...|
|  Sigles en médecine|[Elisa, Elisa, El...|
|Aux couleurs du M...|[Valéry Beaud, sc...|
|         Afghanistan|[proxy-4v.club-in...|
|         Afghanistan|[Soig, Soig, Soig...|
|Algèbre de Boole ...|[Mathic, titan.ac...|
|Algèbre de Boole ...|[Erasoft24, Titib...|
|       Ada (langage)|[Valéry Beaud, Po...|
|            Auvergne|[Rinaldum,

In [32]:
print((dfWithEmptyReplaced1.count(), len(dfWithEmptyReplaced1.columns)))

(839, 2)


In [33]:
print("\nGroup by title and concatenate the lists of contributors")

df_concat1 = dfWithEmptyReplaced1.rdd.map(lambda x: (x.title,x.contributors)).reduceByKey(lambda x,y:x+y).toDF(['title','contributors'])



Group by title and concatenate the lists of contributors


In [34]:
df_concat1.show(1)

+---------------+--------------------+
|          title|        contributors|
+---------------+--------------------+
|Antoine Meillet|[Curry, script de...|
+---------------+--------------------+
only showing top 1 row



In [35]:
DF = titles_pointed_to_title.join(df_concat1,"title", "inner")

In [36]:
DF.select(explode('contributors').alias("value")).groupBy("value").count().orderBy(desc("count")).show()

+-----------------+-----+
|            value|count|
+-----------------+-----+
|       Jonathan71|  168|
|           EhOuiH|  107|
|        Maxam1392|   93|
|            Crom1|   59|
|        Ghost dog|   57|
|             Ben2|   55|
|          Pj44300|   49|
|          Salebot|   45|
|           MedBot|   38|
|       Orthogaffe|   37|
|          Aoineko|   36|
|     Huguespotter|   35|
|           SieBot|   34|
|       Goliadkine|   34|
|            Aflis|   33|
|       Bibliorock|   32|
|        Néfermaât|   30|
|            Iniți|   29|
|            Xqbot|   28|
|Loup Solitaire 81|   27|
+-----------------+-----+
only showing top 20 rows



In [37]:
sc.stop()

In [38]:
print("--- %s seconds ---" % (time.time() - start_time))

--- 1231.4669077396393 seconds ---
