# Inverted Indexing

In [None]:
import pandas as pd
import numpy as np

# 🔹 Step 1: Create dummy data (10 small docs)
docs = pd.DataFrame({
    'doc_id': np.arange(1, 11),
    'content': [
        "the quick brown fox",
        "jumps over the lazy dog",
        "never jump over the lazy dog quickly",
        "bright sun in the blue sky",
        "the fox is quick and clever",
        "big data means big power",
        "data science and machine learning",
        "machine learning is fun",
        "fun with pandas and numpy",
        "numpy and pandas are great"
    ]
})

print("▶️ Documents:")
print(docs)

▶️ Documents:
   doc_id                               content
0       1                   the quick brown fox
1       2               jumps over the lazy dog
2       3  never jump over the lazy dog quickly
3       4            bright sun in the blue sky
4       5           the fox is quick and clever
5       6              big data means big power
6       7     data science and machine learning
7       8               machine learning is fun
8       9             fun with pandas and numpy
9      10            numpy and pandas are great


In [None]:
# 🔹 Step 2: Map phase - tokenize and emit (word, doc_id)
mapped = []

for _, row in docs.iterrows():
    doc_id = row['doc_id']
    words = row['content'].lower().split()
    for word in words:
        mapped.append({'word': word, 'doc_id': doc_id})

mapped_df = pd.DataFrame(mapped)

print("\n▶️ Map Phase Output:")
print(mapped_df.head(20))



▶️ Map Phase Output:
       word  doc_id
0       the       1
1     quick       1
2     brown       1
3       fox       1
4     jumps       2
5      over       2
6       the       2
7      lazy       2
8       dog       2
9     never       3
10     jump       3
11     over       3
12      the       3
13     lazy       3
14      dog       3
15  quickly       3
16   bright       4
17      sun       4
18       in       4
19      the       4


In [None]:
# 🔹 Step 3: Shuffle phase - group by word
grouped = mapped_df.groupby('word')['doc_id'].apply(list).reset_index()

print("\n▶️ Shuffle Phase Output (partial):")
print(grouped.head(10))


▶️ Shuffle Phase Output (partial):
     word         doc_id
0     and  [5, 7, 9, 10]
1     are           [10]
2     big         [6, 6]
3    blue            [4]
4  bright            [4]
5   brown            [1]
6  clever            [5]
7    data         [6, 7]
8     dog         [2, 3]
9     fox         [1, 5]


In [None]:
# 🔹 Step 4: Reduce phase - unique doc IDs per word
grouped['doc_ids'] = grouped['doc_id'].apply(lambda x: sorted(list(set(x))))
#grouped = grouped.drop(columns=['doc_id'])

print("\n▶️ Final Inverted Index:")
print(grouped)


▶️ Final Inverted Index:
        word           doc_id          doc_ids
0        and    [5, 7, 9, 10]    [5, 7, 9, 10]
1        are             [10]             [10]
2        big           [6, 6]              [6]
3       blue              [4]              [4]
4     bright              [4]              [4]
5      brown              [1]              [1]
6     clever              [5]              [5]
7       data           [6, 7]           [6, 7]
8        dog           [2, 3]           [2, 3]
9        fox           [1, 5]           [1, 5]
10       fun           [8, 9]           [8, 9]
11     great             [10]             [10]
12        in              [4]              [4]
13        is           [5, 8]           [5, 8]
14      jump              [3]              [3]
15     jumps              [2]              [2]
16      lazy           [2, 3]           [2, 3]
17  learning           [7, 8]           [7, 8]
18   machine           [7, 8]           [7, 8]
19     means              [6]     

In [None]:
# Search query
query = "the dog"
tokens = query.lower().split()

print("\n🔍 Searching for:", tokens)

# Lookup inverted index
inverted_index = dict(zip(grouped['word'], grouped['doc_id']))

# Collect doc lists for each token
doc_sets = []
for token in tokens:
    doc_list = inverted_index.get(token, [])
    print(f"Word '{token}' found in docs: {doc_list}")
    doc_sets.append(set(doc_list))

# Intersection (AND search)
if doc_sets:
    relevant_docs = set.intersection(*doc_sets)
else:
    relevant_docs = set()

print("\n✅ Relevant docs (AND search):", relevant_docs)

# Show matched content
print("\n📄 Matching Documents:")
print(docs[docs['doc_id'].isin(relevant_docs)])


🔍 Searching for: ['the', 'dog']
Word 'the' found in docs: [1, 2, 3, 4, 5]
Word 'dog' found in docs: [2, 3]

✅ Relevant docs (AND search): {2, 3}

📄 Matching Documents:
   doc_id                               content
1       2               jumps over the lazy dog
2       3  never jump over the lazy dog quickly


In [None]:
# Search query
query = "great dog"
tokens = query.lower().split()

print("\n🔍 Searching for:", tokens)

# Lookup inverted index
inverted_index = dict(zip(grouped['word'], grouped['doc_id']))

# Collect doc lists for each token
doc_sets = []
for token in tokens:
    doc_list = inverted_index.get(token, [])
    print(f"Word '{token}' found in docs: {doc_list}")
    doc_sets.append(set(doc_list))

# Intersection (AND search)
if doc_sets:
    relevant_docs = set.intersection(*doc_sets)
else:
    relevant_docs = set()

print("\n✅ Relevant docs (AND search):", relevant_docs)

# Show matched content
print("\n📄 Matching Documents:")
print(docs[docs['doc_id'].isin(relevant_docs)])


🔍 Searching for: ['great', 'dog']
Word 'great' found in docs: [10]
Word 'dog' found in docs: [2, 3]

✅ Relevant docs (AND search): set()

📄 Matching Documents:
Empty DataFrame
Columns: [doc_id, content]
Index: []


# Apache spark

In [8]:
!pip install pyspark py4j



In [9]:
# include stdio.h
# include <iostream>

# public static void main println


# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
spark

In [10]:
spark

In [11]:
!curl https://raw.githubusercontent.com/markumreed/colab_pyspark/main/WMT.csv >> WMT.csv


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 89556  100 89556    0     0   358k      0 --:--:-- --:--:-- --:--:--  359k


In [12]:
import pandas as pd

pd_df = pd.read_csv("WMT.csv")
pd_df.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1259 entries, 0 to 1258
Data columns (total 7 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   Date       1259 non-null   object 
 1   Open       1259 non-null   float64
 2   High       1259 non-null   float64
 3   Low        1259 non-null   float64
 4   Close      1259 non-null   float64
 5   Adj Close  1259 non-null   float64
 6   Volume     1259 non-null   int64  
dtypes: float64(5), int64(1), object(1)
memory usage: 69.0+ KB


In [13]:
pd_df.head()

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume
0,2016-01-20,61.799999,62.330002,60.200001,60.84,53.990601,17369100
1,2016-01-21,60.98,62.790001,60.91,61.880001,54.913509,12089200
2,2016-01-22,62.439999,63.259998,62.130001,62.689999,55.632324,9197500
3,2016-01-25,62.779999,63.82,62.549999,63.450001,56.306763,12823400
4,2016-01-26,63.360001,64.470001,63.259998,64.0,56.794834,9441200


In [18]:
df = spark.read.csv('WMT.csv')
df.head(10)

[Row(_c0='Date', _c1='Open', _c2='High', _c3='Low', _c4='Close', _c5='Adj Close', _c6='Volume'),
 Row(_c0='2016-01-20', _c1='61.799999', _c2='62.330002', _c3='60.200001', _c4='60.840000', _c5='53.990601', _c6='17369100'),
 Row(_c0='2016-01-21', _c1='60.980000', _c2='62.790001', _c3='60.910000', _c4='61.880001', _c5='54.913509', _c6='12089200'),
 Row(_c0='2016-01-22', _c1='62.439999', _c2='63.259998', _c3='62.130001', _c4='62.689999', _c5='55.632324', _c6='9197500'),
 Row(_c0='2016-01-25', _c1='62.779999', _c2='63.820000', _c3='62.549999', _c4='63.450001', _c5='56.306763', _c6='12823400'),
 Row(_c0='2016-01-26', _c1='63.360001', _c2='64.470001', _c3='63.259998', _c4='64.000000', _c5='56.794834', _c6='9441200'),
 Row(_c0='2016-01-27', _c1='64.099998', _c2='65.180000', _c3='63.889999', _c4='63.950001', _c5='56.750477', _c6='10214300'),
 Row(_c0='2016-01-28', _c1='64.029999', _c2='64.510002', _c3='63.430000', _c4='64.220001', _c5='56.990070', _c6='11278300'),
 Row(_c0='2016-01-29', _c1='64

In [19]:
df.show(5)

+----------+---------+---------+---------+---------+---------+--------+
|       _c0|      _c1|      _c2|      _c3|      _c4|      _c5|     _c6|
+----------+---------+---------+---------+---------+---------+--------+
|      Date|     Open|     High|      Low|    Close|Adj Close|  Volume|
|2016-01-20|61.799999|62.330002|60.200001|60.840000|53.990601|17369100|
|2016-01-21|60.980000|62.790001|60.910000|61.880001|54.913509|12089200|
|2016-01-22|62.439999|63.259998|62.130001|62.689999|55.632324| 9197500|
|2016-01-25|62.779999|63.820000|62.549999|63.450001|56.306763|12823400|
+----------+---------+---------+---------+---------+---------+--------+
only showing top 5 rows



In [21]:
df = spark.read.csv('WMT.csv', header=True)
df.head(10)

[Row(Date='2016-01-20', Open='61.799999', High='62.330002', Low='60.200001', Close='60.840000', Adj Close='53.990601', Volume='17369100'),
 Row(Date='2016-01-21', Open='60.980000', High='62.790001', Low='60.910000', Close='61.880001', Adj Close='54.913509', Volume='12089200'),
 Row(Date='2016-01-22', Open='62.439999', High='63.259998', Low='62.130001', Close='62.689999', Adj Close='55.632324', Volume='9197500'),
 Row(Date='2016-01-25', Open='62.779999', High='63.820000', Low='62.549999', Close='63.450001', Adj Close='56.306763', Volume='12823400'),
 Row(Date='2016-01-26', Open='63.360001', High='64.470001', Low='63.259998', Close='64.000000', Adj Close='56.794834', Volume='9441200'),
 Row(Date='2016-01-27', Open='64.099998', High='65.180000', Low='63.889999', Close='63.950001', Adj Close='56.750477', Volume='10214300'),
 Row(Date='2016-01-28', Open='64.029999', High='64.510002', Low='63.430000', Close='64.220001', Adj Close='56.990070', Volume='11278300'),
 Row(Date='2016-01-29', Open=

In [22]:
df.show(5)

+----------+---------+---------+---------+---------+---------+--------+
|      Date|     Open|     High|      Low|    Close|Adj Close|  Volume|
+----------+---------+---------+---------+---------+---------+--------+
|2016-01-20|61.799999|62.330002|60.200001|60.840000|53.990601|17369100|
|2016-01-21|60.980000|62.790001|60.910000|61.880001|54.913509|12089200|
|2016-01-22|62.439999|63.259998|62.130001|62.689999|55.632324| 9197500|
|2016-01-25|62.779999|63.820000|62.549999|63.450001|56.306763|12823400|
|2016-01-26|63.360001|64.470001|63.259998|64.000000|56.794834| 9441200|
+----------+---------+---------+---------+---------+---------+--------+
only showing top 5 rows



In [23]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)



In [24]:
df = spark.read.csv('WMT.csv', header=True, inferSchema=True)
df.head(10)

[Row(Date=datetime.date(2016, 1, 20), Open=61.799999, High=62.330002, Low=60.200001, Close=60.84, Adj Close=53.990601, Volume=17369100),
 Row(Date=datetime.date(2016, 1, 21), Open=60.98, High=62.790001, Low=60.91, Close=61.880001, Adj Close=54.913509, Volume=12089200),
 Row(Date=datetime.date(2016, 1, 22), Open=62.439999, High=63.259998, Low=62.130001, Close=62.689999, Adj Close=55.632324, Volume=9197500),
 Row(Date=datetime.date(2016, 1, 25), Open=62.779999, High=63.82, Low=62.549999, Close=63.450001, Adj Close=56.306763, Volume=12823400),
 Row(Date=datetime.date(2016, 1, 26), Open=63.360001, High=64.470001, Low=63.259998, Close=64.0, Adj Close=56.794834, Volume=9441200),
 Row(Date=datetime.date(2016, 1, 27), Open=64.099998, High=65.18, Low=63.889999, Close=63.950001, Adj Close=56.750477, Volume=10214300),
 Row(Date=datetime.date(2016, 1, 28), Open=64.029999, High=64.510002, Low=63.43, Close=64.220001, Adj Close=56.99007, Volume=11278300),
 Row(Date=datetime.date(2016, 1, 29), Open=64

In [25]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [27]:
df.describe().show()

+-------+----------------+------------------+------------------+------------------+------------------+-----------------+
|summary|            Open|              High|               Low|             Close|         Adj Close|           Volume|
+-------+----------------+------------------+------------------+------------------+------------------+-----------------+
|  count|            1259|              1259|              1259|              1259|              1259|             1259|
|   mean|96.5083001715647| 97.33101670611597| 95.74480543367744| 96.54861796902313| 92.42759966243042|8509255.043685464|
| stddev|23.3274864439079|23.590538442427473|23.019708825126568|23.292869033013766|25.335281463390604|4760478.447370805|
|    min|           60.98|         62.330002|         60.200001|             60.84|         53.990601|          2227400|
|    max|      153.600006|        153.660004|        151.660004|        152.789993|        152.233536|         56233000|
+-------+----------------+------

In [28]:
df.filter('Close < 62').show()

+----------+---------+---------+---------+---------+---------+--------+
|      Date|     Open|     High|      Low|    Close|Adj Close|  Volume|
+----------+---------+---------+---------+---------+---------+--------+
|2016-01-20|61.799999|62.330002|60.200001|    60.84|53.990601|17369100|
|2016-01-21|    60.98|62.790001|    60.91|61.880001|54.913509|12089200|
+----------+---------+---------+---------+---------+---------+--------+



In [30]:
df.filter('Close < 62').select('Open').show()

+---------+
|     Open|
+---------+
|61.799999|
|    60.98|
+---------+



In [31]:
df.filter('Close < 62').select('Open').collect()

[Row(Open=61.799999), Row(Open=60.98)]

In [32]:
# RDD

# Resilient Distributed Datasets (RDD)

# In numpy, how everything is built upon ndarray

df.rdd

MapPartitionsRDD[113] at javaToPython at NativeMethodAccessorImpl.java:0

In [35]:
df.rdd.getNumPartitions()

1

In [36]:
rdd_obj = df.rdd

In [37]:
new_rdd = rdd_obj.repartition(4)

new_rdd.getNumPartitions()

4

In [38]:
numbers = list(range(1000000))


In [65]:
%time
res=filter(lambda i: i % 2,numbers)

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 5.72 µs


In [41]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [43]:
RDD_example = sc.parallelize(numbers)
print(RDD_example.getNumPartitions())

2


In [55]:
%time
o = RDD_example.filter(lambda i: i % 2) # This would be lot lesser for actual distributed computing

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 4.77 µs
