In [1]:
# !wget https://raw.githubusercontent.com/cstroup/files/main/index_data_isnotnull.csv
# !hdfs dfs -mkdir /stacko
# !hdfs dfs -copyFromLocal index_data_isnotnull.csv /stacko
!hdfs dfs -ls /stacko

Found 4 items
-rw-r--r--   2 root hadoop     618865 2021-10-14 19:50 /stacko/index_data.csv
-rw-r--r--   2 root hadoop    2007303 2021-10-15 18:09 /stacko/index_data_isnotnull.csv
-rw-r--r--   2 root hadoop    3898645 2021-10-14 18:36 /stacko/stackoverflow_2008.csv
-rw-r--r--   2 root hadoop    5137762 2021-10-15 18:08 /stacko/stackoverflow_2008_isnotnull.csv


In [2]:
import pyspark
from pyspark import SparkContext, SparkConf
# from pyspark.rdd import RDD
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import pandas

import json

In [3]:
file = 'index_data_isnotnull.csv'
filepath = 'hdfs:///stacko/' + file # this is in the same directory

print(file)
print(filepath)
# drop na for tags

sc.stop()
spark.stop()

sc = SparkContext(appName="Stackoverflow")
spark = SparkSession(sparkContext=sc)

index_data_isnotnull.csv
hdfs:///stacko/index_data_isnotnull.csv


In [4]:
sc

In [5]:
csv_file = spark.read.csv(path=filepath,
                        encoding='UTF-8',
                        comment=None,
                        header=True, 
                        inferSchema=True)

In [6]:
type(csv_file)

pyspark.sql.dataframe.DataFrame

In [7]:
csv_file.show()

+---+--------------------+
| Id|                Tags|
+---+--------------------+
|  4|<c#><floating-poi...|
|  6|<html><css><inter...|
|  9|<c#><.net><datetime>|
| 11|<c#><datetime><ti...|
| 13|<html><browser><t...|
| 14|        <.net><math>|
| 16|<c#><linq><web-se...|
| 17|<mysql><database>...|
| 19|<performance><alg...|
| 24|<mysql><database>...|
| 25|<c++><c><sockets>...|
| 34|<arrays><actionsc...|
| 36|<sql><sql-server>...|
| 39|<c#><.net><vb.net...|
| 42|<php><plugins><ar...|
| 48|<html><forms><for...|
| 59|<c#><linq><.net-3.5>|
| 61|<mime><file-type>...|
| 66|    <linq><.net-3.5>|
| 72|        <ruby><rdoc>|
+---+--------------------+
only showing top 20 rows



In [8]:
csv_file = csv_file.na.drop()

csv_file.show()

+---+--------------------+
| Id|                Tags|
+---+--------------------+
|  4|<c#><floating-poi...|
|  6|<html><css><inter...|
|  9|<c#><.net><datetime>|
| 11|<c#><datetime><ti...|
| 13|<html><browser><t...|
| 14|        <.net><math>|
| 16|<c#><linq><web-se...|
| 17|<mysql><database>...|
| 19|<performance><alg...|
| 24|<mysql><database>...|
| 25|<c++><c><sockets>...|
| 34|<arrays><actionsc...|
| 36|<sql><sql-server>...|
| 39|<c#><.net><vb.net...|
| 42|<php><plugins><ar...|
| 48|<html><forms><for...|
| 59|<c#><linq><.net-3.5>|
| 61|<mime><file-type>...|
| 66|    <linq><.net-3.5>|
| 72|        <ruby><rdoc>|
+---+--------------------+
only showing top 20 rows



In [9]:
df = csv_file.toPandas()

df

Unnamed: 0,Id,Tags
0,4,<c#><floating-point><type-conversion><double><...
1,6,<html><css><internet-explorer-7>
2,9,<c#><.net><datetime>
3,11,<c#><datetime><time><datediff><relative-time-s...
4,13,<html><browser><timezone><user-agent><timezone...
...,...,...
49995,355933,<c#>
49996,355934,<asp.net><iis><memory><virtual-memory>
49997,355937,<ruby-on-rails><search><sphinx><thinking-sphinx>
49998,355938,<apache-flex><compiler-construction><intellij-...


In [10]:
df['Tags'] = df['Tags'].str.replace('><', ' ')
df['Tags'] = df['Tags'].str.replace('>', '')
df['Tags'] = df['Tags'].str.replace('<', '')

df['Tags'] = df['Tags'].str.split()

df

Unnamed: 0,Id,Tags
0,4,"[c#, floating-point, type-conversion, double, ..."
1,6,"[html, css, internet-explorer-7]"
2,9,"[c#, .net, datetime]"
3,11,"[c#, datetime, time, datediff, relative-time-s..."
4,13,"[html, browser, timezone, user-agent, timezone..."
...,...,...
49995,355933,[c#]
49996,355934,"[asp.net, iis, memory, virtual-memory]"
49997,355937,"[ruby-on-rails, search, sphinx, thinking-sphinx]"
49998,355938,"[apache-flex, compiler-construction, intellij-..."


In [12]:
id_list = []
tag_list = []

for index, row in df.iterrows():
    i = row['Id']
    for t in row['Tags']:
        id_list.append(i)
        tag_list.append(t)

df_final = pandas.DataFrame({
"Id": id_list,
"Tag": tag_list
})

df_final

Unnamed: 0,Id,Tag
0,4,c#
1,4,floating-point
2,4,type-conversion
3,4,double
4,4,decimal
...,...,...
145753,355938,apache-flex
145754,355938,compiler-construction
145755,355938,intellij-idea
145756,355938,flexbuilder


In [13]:
df_spark = spark.createDataFrame(df_final)

df_spark.show()

+---+-------------------+
| Id|                Tag|
+---+-------------------+
|  4|                 c#|
|  4|     floating-point|
|  4|    type-conversion|
|  4|             double|
|  4|            decimal|
|  6|               html|
|  6|                css|
|  6|internet-explorer-7|
|  9|                 c#|
|  9|               .net|
|  9|           datetime|
| 11|                 c#|
| 11|           datetime|
| 11|               time|
| 11|           datediff|
| 11| relative-time-span|
| 13|               html|
| 13|            browser|
| 13|           timezone|
| 13|         user-agent|
+---+-------------------+
only showing top 20 rows



In [14]:
df_clean = df_spark.selectExpr("cast(Tag as string) Tag", 'cast(Id as integer) Id')
df_clean.printSchema()
df_clean.show(truncate=False)

root
 |-- Tag: string (nullable = true)
 |-- Id: integer (nullable = true)

+-------------------+---+
|Tag                |Id |
+-------------------+---+
|c#                 |4  |
|floating-point     |4  |
|type-conversion    |4  |
|double             |4  |
|decimal            |4  |
|html               |6  |
|css                |6  |
|internet-explorer-7|6  |
|c#                 |9  |
|.net               |9  |
|datetime           |9  |
|c#                 |11 |
|datetime           |11 |
|time               |11 |
|datediff           |11 |
|relative-time-span |11 |
|html               |13 |
|browser            |13 |
|timezone           |13 |
|user-agent         |13 |
+-------------------+---+
only showing top 20 rows



In [15]:
rdd = df_clean.rdd

In [21]:
rdd.take(5)

[Row(Tag='c#', Id=4),
 Row(Tag='floating-point', Id=4),
 Row(Tag='type-conversion', Id=4),
 Row(Tag='double', Id=4),
 Row(Tag='decimal', Id=4)]

In [16]:
index = sorted(rdd.groupByKey().mapValues(list).collect())

In [17]:
type(index)

list

In [24]:
index[0]

('.emf', [152729, 236627])

In [18]:
dict_sample = dict(index[0:2])

json_sample = json.dumps(dict_sample, indent = 4) 
print(json_sample)

{
    ".emf": [
        152729,
        236627
    ],
    ".htaccess": [
        3157,
        8441,
        33751,
        33790,
        50931,
        59380,
        62384,
        72458,
        73123,
        75127,
        81631,
        113090,
        122097,
        142559,
        149236,
        152823,
        158384,
        184813,
        193160,
        193273,
        196500,
        200393,
        201602,
        209209,
        216019,
        234723,
        246398,
        257936,
        258680,
        259369,
        259372,
        260041,
        265898,
        273433,
        282161,
        282541,
        285383,
        286004,
        293285,
        294976,
        324381,
        324400,
        329145,
        331549,
        337519,
        338981,
        342509,
        353815
    ]
}


In [25]:
dictionary = dict(index)

with open("inverted_index.json", "w") as outfile:
    json.dump(dictionary, outfile)