# Inverted Indexing

In [1]:
import pandas as pd
import numpy as np

# 🔹 Step 1: Create dummy data (10 small docs)
docs = pd.DataFrame({
    'doc_id': np.arange(1, 11),
    'content': [
        "the quick brown fox",
        "jumps over the lazy dog",
        "never jump over the lazy dog quickly",
        "bright sun in the blue sky",
        "the fox is quick and clever",
        "big data means big power",
        "data science and machine learning",
        "machine learning is fun",
        "fun with pandas and numpy",
        "numpy and pandas are great"
    ]
})

print("▶️ Documents:")
print(docs)

▶️ Documents:
   doc_id                               content
0       1                   the quick brown fox
1       2               jumps over the lazy dog
2       3  never jump over the lazy dog quickly
3       4            bright sun in the blue sky
4       5           the fox is quick and clever
5       6              big data means big power
6       7     data science and machine learning
7       8               machine learning is fun
8       9             fun with pandas and numpy
9      10            numpy and pandas are great


In [2]:
# 🔹 Step 2: Map phase - tokenize and emit (word, doc_id)
mapped = []

for _, row in docs.iterrows():
    doc_id = row['doc_id']
    words = row['content'].lower().split()
    for word in words:
        mapped.append({'word': word, 'doc_id': doc_id})

mapped_df = pd.DataFrame(mapped)

print("\n▶️ Map Phase Output:")
print(mapped_df.head(20))



▶️ Map Phase Output:
       word  doc_id
0       the       1
1     quick       1
2     brown       1
3       fox       1
4     jumps       2
5      over       2
6       the       2
7      lazy       2
8       dog       2
9     never       3
10     jump       3
11     over       3
12      the       3
13     lazy       3
14      dog       3
15  quickly       3
16   bright       4
17      sun       4
18       in       4
19      the       4


In [3]:
# 🔹 Step 3: Shuffle phase - group by word
grouped = mapped_df.groupby('word')['doc_id'].apply(list).reset_index()

print("\n▶️ Shuffle Phase Output (partial):")
print(grouped.head(10))



▶️ Shuffle Phase Output (partial):
     word         doc_id
0     and  [5, 7, 9, 10]
1     are           [10]
2     big         [6, 6]
3    blue            [4]
4  bright            [4]
5   brown            [1]
6  clever            [5]
7    data         [6, 7]
8     dog         [2, 3]
9     fox         [1, 5]


In [4]:
# 🔹 Step 4: Reduce phase - unique doc IDs per word
grouped['doc_ids'] = grouped['doc_id'].apply(lambda x: sorted(list(set(x))))
#grouped = grouped.drop(columns=['doc_id'])

print("\n▶️ Final Inverted Index:")
print(grouped)


▶️ Final Inverted Index:
        word           doc_id          doc_ids
0        and    [5, 7, 9, 10]    [5, 7, 9, 10]
1        are             [10]             [10]
2        big           [6, 6]              [6]
3       blue              [4]              [4]
4     bright              [4]              [4]
5      brown              [1]              [1]
6     clever              [5]              [5]
7       data           [6, 7]           [6, 7]
8        dog           [2, 3]           [2, 3]
9        fox           [1, 5]           [1, 5]
10       fun           [8, 9]           [8, 9]
11     great             [10]             [10]
12        in              [4]              [4]
13        is           [5, 8]           [5, 8]
14      jump              [3]              [3]
15     jumps              [2]              [2]
16      lazy           [2, 3]           [2, 3]
17  learning           [7, 8]           [7, 8]
18   machine           [7, 8]           [7, 8]
19     means              [6]     

In [5]:
# Search query
query = "the dog"
tokens = query.lower().split()

print("\n🔍 Searching for:", tokens)

# Lookup inverted index
inverted_index = dict(zip(grouped['word'], grouped['doc_id']))

# Collect doc lists for each token
doc_sets = []
for token in tokens:
    doc_list = inverted_index.get(token, [])
    print(f"Word '{token}' found in docs: {doc_list}")
    doc_sets.append(set(doc_list))

# Intersection (AND search)
if doc_sets:
    relevant_docs = set.intersection(*doc_sets)
else:
    relevant_docs = set()

print("\n✅ Relevant docs (AND search):", relevant_docs)

# Show matched content
print("\n📄 Matching Documents:")
print(docs[docs['doc_id'].isin(relevant_docs)])


🔍 Searching for: ['the', 'dog']
Word 'the' found in docs: [1, 2, 3, 4, 5]
Word 'dog' found in docs: [2, 3]

✅ Relevant docs (AND search): {2, 3}

📄 Matching Documents:
   doc_id                               content
1       2               jumps over the lazy dog
2       3  never jump over the lazy dog quickly


# Apache spark

In [6]:
!pip install pyspark py4j




<!-- #include<stdio.h> -->

<!--
public static void main args {

} -->



In [4]:
# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
spark

In [5]:
!curl https://raw.githubusercontent.com/markumreed/colab_pyspark/main/WMT.csv >> WMT.csv


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100 89556  100 89556    0     0   392k      0 --:--:-- --:--:-- --:--:--  393k


In [6]:
import pandas as pd

pd_df = pd.read_csv("WMT.csv")
pd_df.info()


ParserError: Error tokenizing data. C error: Expected 7 fields in line 1260, saw 13


In [None]:
pd_df.head()

In [7]:
df = spark.read.csv('WMT.csv', header=True, inferSchema=True)
df.head(10)

[Row(Date=datetime.date(2016, 1, 20), Open=61.799999, High=62.330002, Low=60.200001, Close=60.84, Adj Close=53.990601, Volume='17369100'),
 Row(Date=datetime.date(2016, 1, 21), Open=60.98, High=62.790001, Low=60.91, Close=61.880001, Adj Close=54.913509, Volume='12089200'),
 Row(Date=datetime.date(2016, 1, 22), Open=62.439999, High=63.259998, Low=62.130001, Close=62.689999, Adj Close=55.632324, Volume='9197500'),
 Row(Date=datetime.date(2016, 1, 25), Open=62.779999, High=63.82, Low=62.549999, Close=63.450001, Adj Close=56.306763, Volume='12823400'),
 Row(Date=datetime.date(2016, 1, 26), Open=63.360001, High=64.470001, Low=63.259998, Close=64.0, Adj Close=56.794834, Volume='9441200'),
 Row(Date=datetime.date(2016, 1, 27), Open=64.099998, High=65.18, Low=63.889999, Close=63.950001, Adj Close=56.750477, Volume='10214300'),
 Row(Date=datetime.date(2016, 1, 28), Open=64.029999, High=64.510002, Low=63.43, Close=64.220001, Adj Close=56.99007, Volume='11278300'),
 Row(Date=datetime.date(2016, 1

In [8]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']

In [9]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: string (nullable = true)



In [10]:
df.show()

+----------+---------+---------+---------+---------+---------+--------+
|      Date|     Open|     High|      Low|    Close|Adj Close|  Volume|
+----------+---------+---------+---------+---------+---------+--------+
|2016-01-20|61.799999|62.330002|60.200001|    60.84|53.990601|17369100|
|2016-01-21|    60.98|62.790001|    60.91|61.880001|54.913509|12089200|
|2016-01-22|62.439999|63.259998|62.130001|62.689999|55.632324| 9197500|
|2016-01-25|62.779999|    63.82|62.549999|63.450001|56.306763|12823400|
|2016-01-26|63.360001|64.470001|63.259998|     64.0|56.794834| 9441200|
|2016-01-27|64.099998|    65.18|63.889999|63.950001|56.750477|10214300|
|2016-01-28|64.029999|64.510002|    63.43|64.220001| 56.99007|11278300|
|2016-01-29|    64.75|66.529999|64.739998|66.360001|58.889149|16439100|
|2016-02-01|65.910004|    67.93|65.889999|     67.5| 59.90081|14728400|
|2016-02-02|67.300003|67.839996|66.279999|66.860001|59.332867|13585900|
|2016-02-03|67.309998|     67.5|    65.07|66.269997| 58.80928|12

In [11]:
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+------------------+-----------------+
|summary|             Open|              High|               Low|            Close|         Adj Close|           Volume|
+-------+-----------------+------------------+------------------+-----------------+------------------+-----------------+
|  count|             2518|              2518|              2518|             2518|              2518|             2518|
|   mean|96.50830017156473| 97.33101670611606| 95.74480543367737|  96.548617969023| 92.42759966243037|8509442.510925705|
| stddev|23.32285199736492|23.585851735628015|23.015135524430754|23.28824146387443|25.330248130379083|4760469.149170408|
|    min|            60.98|         62.330002|         60.200001|            60.84|         53.990601|         10010500|
|    max|       153.600006|        153.660004|        151.660004|       152.789993|        152.233536|          9999600|
+-------+-----------------+-----

In [12]:
df.filter('Close < 62').show()

+----------+---------+---------+---------+---------+---------+--------+
|      Date|     Open|     High|      Low|    Close|Adj Close|  Volume|
+----------+---------+---------+---------+---------+---------+--------+
|2016-01-20|61.799999|62.330002|60.200001|    60.84|53.990601|17369100|
|2016-01-21|    60.98|62.790001|    60.91|61.880001|54.913509|12089200|
|2016-01-20|61.799999|62.330002|60.200001|    60.84|53.990601|17369100|
|2016-01-21|    60.98|62.790001|    60.91|61.880001|54.913509|12089200|
+----------+---------+---------+---------+---------+---------+--------+



In [13]:
df.filter('Close < 62').select('Open')#.show()

DataFrame[Open: double]

In [25]:
type(df)

In [30]:
rdd_obj = df.rdd

In [31]:
rdd_obj.getNumPartitions()

1

In [31]:
new_rdd = rdd_obj.repartition(4)

new_rdd.getNumPartitions()

4

In [25]:
numbers = list(range(100))

In [26]:
%time
res=filter(lambda i: i % 2,numbers)

CPU times: user 23 µs, sys: 0 ns, total: 23 µs
Wall time: 27.9 µs


In [27]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [28]:
RDD_example = sc.parallelize(numbers)


In [29]:
RDD_example.getNumPartitions()

2

In [22]:
%time
o = RDD_example.filter(lambda i: i % 2)

CPU times: user 2 µs, sys: 1 µs, total: 3 µs
Wall time: 5.01 µs
