# Problem 1

##### Load the json data from the file

In [13]:
import json
def load_data(path):
    with open(path, 'r') as f:
        data = json.load(f)
    return data
data = load_data('problem1.json')
print(data)

{'menu': {'header': 'SVG Viewer', 'items': [{'id': 'Open'}, {'id': 'OpenNew', 'label': 'Open New'}, None, {'id': 'ZoomIn', 'label': 'Zoom In'}, {'id': 'ZoomOut', 'label': 'Zoom Out'}, {'id': 'OriginalView', 'label': 'Original View'}, None, {'id': 'Quality'}, {'id': 'Pause'}, {'id': 'Mute'}, None, {'id': 'Find', 'label': 'Find...'}, {'id': 'FindAgain', 'label': 'Find Again'}, {'id': 'Copy'}, {'id': 'CopyAgain', 'label': 'Copy Again'}, {'id': 'CopySVG', 'label': 'Copy SVG'}, {'id': 'ViewSVG', 'label': 'View SVG'}, {'id': 'ViewSource', 'label': 'View Source'}, {'id': 'SaveAs', 'label': 'Save As'}, None, {'id': 'Help'}, {'id': 'About', 'label': 'About Adobe CVG Viewer...'}]}}


### a. Write a program to convert it to

#####  MessagePack

In [30]:
import msgpack
    
#function to convert the data to MessagePack format
def convert_to_msgpack(data):
    return msgpack.packb(data)

#function to save the MessagePack data to a file
def save_msgpack(msgpack_data, file_path):      
    with open(file_path, 'wb') as msgpack_file:
        msgpack_file.write(msgpack_data)


# function to load the data again to check if it is correct
def load_msgpack(file_path):
    with open(file_path, 'rb') as msgpack_file:
        return msgpack.unpackb(msgpack_file.read())

# convert the data to msgpack
 
loaded_data = load_msgpack('problem1.msgpack')


# check if the data is correct
assert data == loaded_data


- First, we load the data from file using json package
- Then, we convert that "json" data to messagepack using msgpack package.
- Finally, we save the data to file
- 
![msgpack.png](attachment:msgpack.png)


#### Avro

In [11]:
import fastavro
from io import BytesIO

# Define the Avro schema
avro_schema={
    "type": "record",
    "name": "MRecord",
    "fields": [
        {"name": "menu", "type": {
            "type": "record",
            "name": "Menu",
            "fields": [
                {"name": "header", "type": "string"},
                {"name": "items", "type": {
                    "type": "array",
                    "items": [
                        "null",
                        {
                            "type": "record",
                            "name": "Item",
                            "fields": [
                                {"name": "id", "type": "string"},
                                {"name": "label", "type": ["null", "string"]}
                            ]
                        }
                    ]
                }}
            ]
        }},
    ]
}


#Function to convert the data to Avro format
def convert_to_avro( avro_schema):
    # Convert the data to Avro format
    return fastavro.parse_schema(avro_schema)

#Function to save the Avro data to a file
def save_to_file(parser,data,file_path):
    with open(file_path, 'wb') as avro_file:
        fastavro.writer(avro_file, parser, [data])
parser = convert_to_avro( avro_schema)
save_to_file(parser,data,'problem1.avro')




- First, we load the data from file using json package
- Then, we define the avro schema
- we define a binary object to store the bytes of avro data using fastavro package
- Finally, we save the data to file
- 
![avro.jpg](attachment:avro.jpg)

#### ProtoBuffer

In [19]:
import problem1_pb2

#Function to convert the data to Protobuf
def convert_to_protobuf(data):
    MyMessage = problem1_pb2.MyMessage()
    menu = MyMessage.menu

    menu.header = data['menu']['header']
    for item in data['menu']['items']:
        menu_item = menu.items.add()
        if item is None:
            continue
        menu_item.id = item['id']
        if 'label' in item:
            menu_item.label = item['label']
    return menu.SerializeToString()

#Functoin to save the Protobuf data to a file
def save_protobuf(protobuf_data, file_path):
    with open(file_path, 'wb') as protobuf_file:
        protobuf_file.write(protobuf_data)

protobuf_data = convert_to_protobuf(data)
save_protobuf(protobuf_data, 'problem1.protobuf')

- First, we load the data from file using json package
- Then, define the protobuf file "problem1.proto"
```proto
syntax = "proto3";

message MyMessage {

    message Items {
        string id = 1;
        optional string label = 2;
    }

    message Menu {
        string header = 1;
        repeated Items items = 2;
    }

    Menu menu = 1;
}
```
- Extract python class from protobuf file:  ```protoc -I=. --python_out=. ./problem1.proto```
- Use this class to encode the json data and extract the serialized data from it
- Save the serial data to the protobuf file
- 
![protobuf.jpg](attachment:protobuf.jpg)

#### Thrift


In [33]:
import sys
sys.path.append("gen-py")
from example.ttypes import Menu, Item
from thrift.protocol import TBinaryProtocol, TCompactProtocol
from thrift.transport import TTransport

#Function to convert the data to Thrift Binary format
def convert_to_thrift(data):
    menu = Menu()
    menu.header = data['menu']['header']
    menu.items = []

    for item in data['menu']['items']:
        if item :
            menu.items.append(Item(id=item['id'], label=item.get('label', '')))
        else:
            menu.items.append(Item(id=None, label=None))

    # convert the menu to binary format
    transport_out = TTransport.TMemoryBuffer()
    protocol_out = TBinaryProtocol.TBinaryProtocol(transport_out)
    menu.write(protocol_out)
    thrift_data = transport_out.getvalue()

    return thrift_data
#Function to convert the data to Thrift Compact format

def convert_to_thrift_compact(data):
    menu = Menu()
    menu.header = data['menu']['header']
    menu.items = []

    for item in data['menu']['items']:
        if item is not None:
            menu.items.append(Item(id=item['id'], label=item.get('label', '')))

    # convert the menu to compact format
    transport_out = TTransport.TMemoryBuffer()
    protocol_out = TCompactProtocol.TCompactProtocol(transport_out)
    menu.write(protocol_out)
    thrift_data = transport_out.getvalue()

    return thrift_data

#Function Save the Thrift data to a file
def save_thrift(thrift_data, file_path):
    with open(file_path, 'wb') as thrift_file:
        thrift_file.write(thrift_data)

thrift_data_bin = convert_to_thrift(data)
thrift_data_compact = convert_to_thrift_compact(data)

save_thrift(thrift_data_bin, 'problem1_bin.thrift')
save_thrift(thrift_data_compact, 'problem1_compact.thrift')

- First, we load the data from file using json package
- Then, define the thrift file "problem1.thrift"
`````` thrift
namespace py example

struct Item {
    1: required string id,
    2: optional string label,
}

struct Menu {
    1: required string header,
    2: optional list<Item> items,
}
``````
- Extract python class from thrift file:  ```thrift-0.19.0.exe -r --gen py problem1.thrift```
- Use thrift apis to generate the serialized data from thrif class
- Use the two thrift protocols "TBinaryProtocol", "TCompactProtocol"
- Save the serial data to the protobuf file
* Thrift binary protocol

![image.png](attachment:image.png)

* Thrift compact protocol

![image-2.png](attachment:image-2.png)

### b. What are the sizes of the new files compared to the original one?

In [55]:
import os
print("Size of the JSON file: ", os.path.getsize('problem1.json'))
print("Size of the MessagePack file: ", os.path.getsize('problem1.msgpack'))
print("Size of the Avro file: ", os.path.getsize('problem1.avro'))
print("Size of the Protobuf file: ", os.path.getsize('problem1.protobuf'))
print("Size of the Thrift Binary file: ", os.path.getsize('problem1_bin.thrift'))
print("Size of the Thrift Compact file: ", os.path.getsize('problem1_compact.thrift'))

Size of the JSON file:  710
Size of the MessagePack file:  451
Size of the Avro file:  748
Size of the Protobuf file:  355
Size of the Thrift Binary file:  535
Size of the Thrift Compact file:  345


### c. Repeat the same entry in this JSON file 100 times, and rerun your code that converts JSON into the four formats above. What are the new sizes of the new files compared to the new JSON file?

In [46]:
lis=[]
for i in data['menu']['items']:
    lis.append(i)
print(len(data['menu']['items']))
for j in range(100-1):
    for i in lis:
        data['menu']['items'].append(i)
print(len(data['menu']['items']))
# write to json file
with open('problem1_large.json', 'w') as f:
    json.dump(data, f)

22
2200


In [59]:
big_data = load_data('problem1_large.json')

# convert the data to msgpack
big_msgpack_data = convert_to_msgpack(big_data)
save_msgpack(big_msgpack_data, 'problem1_large.msgpack')

# convert the data to Avro
parser = convert_to_avro(avro_schema)
save_to_file(parser , big_data, 'problem1_large.avro')

# convert the data to Protobuf
big_protobuf_data = convert_to_protobuf(big_data)
save_protobuf(big_protobuf_data, 'problem1_large.protobuf')

# convert the data to Thrift
big_thrift_data_bin = convert_to_thrift(big_data)
big_thrift_data_compact = convert_to_thrift_compact(big_data)
save_thrift(big_thrift_data_bin, 'problem1_large_bin.thrift')
save_thrift(big_thrift_data_compact, 'problem1_large_compact.thrift')

In [60]:
print("Size of the JSON file: ", os.path.getsize('problem1_large.json'))
print("Size of the MessagePack file: ", os.path.getsize('problem1_large.msgpack'))
print("Size of the Avro file: ", os.path.getsize('problem1_large.avro'))
print("Size of the Protobuf file: ", os.path.getsize('problem1_large.protobuf'))
print("Size of the Thrift Binary file: ", os.path.getsize('problem1_large_bin.thrift'))
print("Size of the Thrift Compact file: ", os.path.getsize('problem1_large_compact.thrift'))

Size of the JSON file:  63545
Size of the MessagePack file:  41734
Size of the Avro file:  31341
Size of the Protobuf file:  34312
Size of the Thrift Binary file:  50926
Size of the Thrift Compact file:  32917


# Problem 2

### a.

- The field name does not have an effect on compatibility as it is same in the two schemas.
- The field salary is changed in the new schema to be able to have a null "optional".So it will prevent the forward compatibility as the old schema could not be able to read the null values in this field.However it has no effect on backward compatibility.
- The fields add to the new schema have not default values, so it will prevent backward compatibility. As the new schema can not be able to read the data written with the old schema, due to lack of these  fields.
- Therefore the two schemas are not forward nethier backward compatible.

### b.
- The fields salary and name don't have any effect on compatibility as they are same in the two schemas.
- The other fields will have no effect also.
- Therefore the two schemas are forward and backward compatible.

### c.
- Similar to the part b, the fields salary and name don't have any effect on compatibility as they are same in the two schemas.
- The other fields will have no effect also.
- Therefore the two schemas are forward and backward compatible.

### d.
 
- I use a java library to check the compatibility [avro-compatibility](https://github.com/ExpediaGroup/avro-compatibility.git)

- It return that the first new schema "part a" is not compatible with the older one.
- It return that the second new schema "part b" is compatible with the older one.
- It return that the third new schema "part c" is compatible with the older one.

`````` java
public class Main {
    public static String oldSchemaString = "{\n" +
            "  \"type\": \"record\",\n" +
            "  \"name\": \"Employee\",\n" +
            "  \"fields\": [\n" +
            "    {\"name\": \"address\", \"type\": \"string\", \"default\": \"Egypt\"},\n" +
            "    {\"name\": \"salary\", \"type\": \"long\"},\n" +
            "    {\"name\": \"name\", \"type\": \"string\"}\n" +
            "  ]\n" +
            "}";
    public static Schema oldSchema = parseSchema(oldSchemaString);

    public static final String schemaString1 = "{\n" +
            "  \"type\": \"record\",\n" +
            "  \"name\": \"Employee\",\n" +
            "  \"fields\": [\n" +
            "    {\"name\": \"name\", \"type\": \"string\"},\n" +
            "    {\"name\": \"family_name\", \"type\": \"string\"},\n" +
            "    {\"name\": \"salary\", \"type\": [\"null\", \"long\"]},\n" +
            "    {\"name\": \"age\", \"type\": \"long\"}\n" +
            "  ]\n" +
            "}\n";
    public static Schema schema1 = parseSchema(schemaString1);
    public static final String schemaString2 = "{\n" +
            "  \"type\": \"record\",\n" +
            "  \"name\": \"Employee\",\n" +
            "  \"fields\": [\n" +
            "    {\"name\": \"salary\", \"type\": \"long\"},\n" +
            "    {\"name\": \"name\", \"type\": \"string\"}\n" +
            "  ]\n" +
            "}\n";
    public static Schema schema2 = parseSchema(schemaString2);
    public static final String schemaString3 = "{\n" +
            "  \"type\": \"record\",\n" +
            "  \"name\": \"Employee\",\n" +
            "  \"fields\": [\n" +
            "    {\"name\": \"name\", \"type\": \"string\"},\n" +
            "    {\"name\": \"active\", \"type\": \"boolean\", \"default\": true},\n" +
            "    {\"name\": \"salary\", \"type\": \"long\"}\n" +
            "  ]\n" +
            "}\n";
    public static Schema schema3 = parseSchema(schemaString3);

    static Schema parseSchema(String schemaString) {
        try {
            Schema.Parser parser1 = new Schema.Parser();
            Schema schema = parser1.parse(schemaString);
            return schema;
        } catch (SchemaParseException e) {
            return null;
        }
    }

    public static void main(String[] args) {
        //check compatibility between oldSchema and schema1
        System.out.println(Compatibility.checkThat(oldSchema).canRead(schema1).asMessage());
        System.out.println(Compatibility.checkThat(schema1).canBeReadBy(oldSchema).asMessage());
        //check compatibility between oldSchema and schema2
        System.out.println(Compatibility.checkThat(oldSchema).canRead(schema2).asMessage());
        System.out.println(Compatibility.checkThat(schema2).canBeReadBy(oldSchema).asMessage());
        //check compatibility between oldSchema and schema3
        System.out.println(Compatibility.checkThat(oldSchema).canRead(schema3).asMessage());
        System.out.println(Compatibility.checkThat(schema3).canBeReadBy(oldSchema).asMessage());


    }
}
``````

- The output
![image.png](attachment:image.png)

# Problem 3

### a. Is Parquet a columnar-oriented, row-oriented, or hybrid. Explain.

- Parquet is a columnar-oriented storage format. It stores data in a columnar fashion, meaning that values from the same column are stored together, which provides advantages for analytical queries, as only the necessary columns are read during query execution.

[reference](https://airbyte.com/data-engineering-resources/parquet-vs-avro#:~:text=Parquet%20and%20Avro%20are%20two,Avro%20and%20their%20key%20features.)

### b. List and briefly explain 3 different data encoding techniques used in Parquet.

- Run-Length Encoding (RLE): It represents the repeated values efficiently, as it encodes sequences as a single value followed by the number of the sequence.
- Dictionary Encoding: it creates a dictionary of unique values in a column and replace the actual values with dictionary indices.
- Delta Encoding: It stores only the differences between values in the column and thier corresponding previous values.

[reference](https://www.linkedin.com/pulse/encodings-parquet-akhil-pathirippilly-mana/)

### c. Where is Parquet file metadata located in the file ?

- The parquet metadata is stored at the end of the file.Which include the essential information about the file such as the schema, row groups and individual columns.

[reference](https://www.restack.io/docs/openmetadata-knowledge-read-parquet-metadata-openmetadata)